This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new e8d5350 [ISSUE #59] Support rich initialization modes of RocketMQ
source (#62)
e8d5350 is described below
commit e8d535099ccc1c999e13b120d5de119df999a724
Author: 高思伟 <[email protected]>
AuthorDate: Mon Oct 10 15:09:14 2022 +0800
[ISSUE #59] Support rich initialization modes of RocketMQ source (#62)
1.Add rich initialization modes those are similar to kafka-connector
2.Add metrics in source phase
Co-authored-by: 高思伟 <[email protected]>
---
README.md | 38 +++-
pom.xml | 14 ++
.../flink/legacy/RocketMQSourceFunction.java | 232 +++++++++++++++------
.../OffsetResetStrategy.java} | 19 +-
.../TestUtils.java => config/StartupMode.java} | 20 +-
.../SimpleStringDeserializationSchema.java | 39 ++++
.../flink/legacy/common/util/MetricUtils.java | 18 ++
.../flink/legacy/common/util/TestUtils.java | 7 +
.../sourceFunction/RocketMQSourceFunctionTest.java | 88 ++++++++
9 files changed, 387 insertions(+), 88 deletions(-)
diff --git a/README.md b/README.md
index fa53dd7..be889eb 100644
--- a/README.md
+++ b/README.md
@@ -73,7 +73,12 @@ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironm
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
"localhost:9876");
- env.addSource(new RocketMQSourceFunction(new
SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
+ RocketMQSourceFunction<Map<Object,Object>> source = new
RocketMQSourceFunction(
+ new SimpleKeyValueDeserializationSchema("id", "address"),
consumerProps);
+ // use group offsets.
+ // If there is no committed offset,consumer would start from the
latest offset.
+ source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
+ env.addSource()
.name("rocketmq-source")
.setParallelism(2)
.process(new ProcessFunction<Map, Map>() {
@@ -123,13 +128,40 @@ The following configurations are all from the class
`org.apache.rocketmq.flink.l
| consumer.group | consumer group *Required* | null |
| consumer.topic | consumer topic *Required* | null |
| consumer.tag | consumer topic tag | * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the
server | latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when
`consumer.offset.reset.to=timestamp` was set | `System.currentTimeMillis()`
|
| consumer.offset.persist.interval | auto commit offset interval |
5000 |
| consumer.pull.thread.pool.size | consumer pull thread pool size | 20
|
| consumer.batch.size | consumer messages batch size | 32 |
| consumer.delay.when.message.not.found | the delay time when messages were
not found | 10 |
+### Consumer Strategy
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+ new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1),
201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3),
123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY | DESCRIPTION
|
+| --------------------------- |
------------------------------------------------------------ |
+| EARLIEST | consume from the earliest offset after restart
with no state |
+| LATEST | consume from the latest offset after restart
with no state |
+| TIMESTAMP | consume from the closest timestamp of data in
each broker's queue |
+| GROUP_OFFSETS with LATEST | If broker has the committed offset then
consume from the next else consume from the latest offset |
+| GROUP_OFFSETS with EARLIEST | If broker has the committed offset ,consume
from the next ,otherwise consume from the earlist offset.It's useful when
server expand broker |
+| SPECIFIC_OFFSETS | consumer from the specificOffsets in broker's
queues.Group offsets will be returned from those broker's queues whose didn't
be specified |
+
+**Attention**
+
+Only if Flink job starts with none state, these strategies are effective. If
the job recovers from the checkpoint, the offset would intialize from the
stored data.
## RocketMQ SQL Connector
diff --git a/pom.xml b/pom.xml
index 1b725e1..8617bd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,6 +315,20 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index db2ab15..b078056 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -23,6 +23,8 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
+import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
import
org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
@@ -76,13 +78,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
-import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
-import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
-import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
-import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
import static
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
-import static
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
/**
* The RocketMQSource is based on RocketMQ pull consumer mode, and provides
exactly once reliability
@@ -116,12 +113,33 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
private Properties props;
private String topic;
private String group;
- private long startMessageOffset;
private transient volatile boolean restored;
private transient boolean enableCheckpoint;
private volatile Object checkPointLock;
private Meter tpsMetric;
+ private MetricUtils.TimestampGauge fetchDelay = new
MetricUtils.TimestampGauge();
+ private MetricUtils.TimestampGauge emitDelay = new
MetricUtils.TimestampGauge();
+
+ /** The startup mode for the consumer (default is {@link
StartupMode#GROUP_OFFSETS}). */
+ private StartupMode startMode = StartupMode.GROUP_OFFSETS;
+
+ /**
+ * If StartupMode#GROUP_OFFSETS has no commit offset.OffsetResetStrategy
would offer init
+ * strategy.
+ */
+ private OffsetResetStrategy offsetResetStrategy =
OffsetResetStrategy.LATEST;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ private Map<MessageQueue, Long> specificStartupOffsets;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
StartupMode#TIMESTAMP}.
+ */
+ private long specificTimeStamp;
public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema,
Properties props) {
this.schema = schema;
@@ -135,11 +153,6 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
- this.startMessageOffset =
- props.containsKey(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET)
- ? Long.parseLong(
-
props.getProperty(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET))
- : DEFAULT_START_MESSAGE_OFFSET;
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
@@ -214,14 +227,13 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
getRuntimeContext()
.getMetricGroup()
.meter(MetricUtils.METRICS_TPS, new
MeterView(outputCounter, 60));
- }
- @Override
- public void run(SourceContext context) throws Exception {
- String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
- String tag =
- props.getProperty(RocketMQConfig.CONSUMER_TAG,
RocketMQConfig.DEFAULT_CONSUMER_TAG);
- int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE,
DEFAULT_CONSUMER_BATCH_SIZE);
+ getRuntimeContext()
+ .getMetricGroup()
+ .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
+ getRuntimeContext()
+ .getMetricGroup()
+ .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
final RuntimeContext ctx = getRuntimeContext();
// The lock that guarantees that record emission and state updates are
atomic,
@@ -229,7 +241,22 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
int taskNumber = ctx.getNumberOfParallelSubtasks();
int taskIndex = ctx.getIndexOfThisSubtask();
log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}",
taskNumber, taskIndex);
+ Collection<MessageQueue> totalQueues =
consumer.fetchSubscribeMessageQueues(topic);
+ messageQueues =
+ RocketMQUtils.allocate(totalQueues, taskNumber,
ctx.getIndexOfThisSubtask());
+ // If the job recovers from the state, the state has already contained
the offsets of last
+ // commit.
+ if (!restored) {
+ initOffsets(messageQueues);
+ }
+ }
+ @Override
+ public void run(SourceContext context) throws Exception {
+ String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
+ String tag =
+ props.getProperty(RocketMQConfig.CONSUMER_TAG,
RocketMQConfig.DEFAULT_CONSUMER_TAG);
+ int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE,
DEFAULT_CONSUMER_BATCH_SIZE);
timer.scheduleAtFixedRate(
() -> {
//
context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
@@ -238,10 +265,6 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
5,
5,
TimeUnit.SECONDS);
-
- Collection<MessageQueue> totalQueues =
consumer.fetchSubscribeMessageQueues(topic);
- messageQueues =
- RocketMQUtils.allocate(totalQueues, taskNumber,
ctx.getIndexOfThisSubtask());
for (MessageQueue mq : messageQueues) {
this.executor.execute(
() ->
@@ -249,8 +272,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
() -> {
while (runningChecker.isRunning()) {
try {
- long offset =
getMessageQueueOffset(mq);
-
+ long offset =
offsetTable.get(mq);
PullResult pullResult;
if (StringUtils.isEmpty(sql)) {
pullResult =
@@ -271,6 +293,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
case FOUND:
List<MessageExt>
messages =
pullResult.getMsgFoundList();
+ long fetchTime =
System.currentTimeMillis();
for (MessageExt msg :
messages) {
byte[] key =
msg.getKeys() != null
@@ -299,12 +322,24 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
context.collectWithTimestamp(
data,
msg.getBornTimestamp());
+ long emitTime =
+
System.currentTimeMillis();
// update max
eventTime per queue
//
waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
waterMarkForAll.extractTimestamp(
msg.getBornTimestamp());
tpsMetric.markEvent();
+ long eventTime
=
+
msg.getStoreTimestamp();
+
fetchDelay.report(
+
Math.abs(
+
fetchTime
+
- eventTime));
+
emitDelay.report(
+
Math.abs(
+
emitTime
+
- eventTime));
}
}
found = true;
@@ -359,46 +394,121 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
}
- private long getMessageQueueOffset(MessageQueue mq) throws
MQClientException {
- Long offset = offsetTable.get(mq);
- if (offset != null) {
- return offset;
- }
- // restoredOffsets(unionOffsetStates) is the restored global union
state;
- // should only snapshot mqs that actually belong to us
- if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
- // fetchConsumeOffset from broker
- offset = consumer.fetchConsumeOffset(mq, false);
- if (!restored && offset < 0) {
- String initialOffset =
- props.getProperty(
- RocketMQConfig.CONSUMER_OFFSET_RESET_TO,
CONSUMER_OFFSET_LATEST);
- switch (initialOffset) {
- case CONSUMER_OFFSET_EARLIEST:
- offset = consumer.minOffset(mq);
- break;
- case CONSUMER_OFFSET_LATEST:
- offset = consumer.maxOffset(mq);
- break;
- case CONSUMER_OFFSET_TIMESTAMP:
- offset =
- consumer.searchOffset(
+ /**
+ * only flink job start with no state can init offsets from broker
+ *
+ * @param messageQueues
+ * @throws MQClientException
+ */
+ private void initOffsets(List<MessageQueue> messageQueues) throws
MQClientException {
+ for (MessageQueue mq : messageQueues) {
+ long offset;
+ switch (startMode) {
+ case LATEST:
+ offset = consumer.maxOffset(mq);
+ break;
+ case EARLIEST:
+ offset = consumer.minOffset(mq);
+ break;
+ case GROUP_OFFSETS:
+ offset = consumer.fetchConsumeOffset(mq, false);
+ // the min offset return if consumer group first
join,return a negative number
+ // if
+ // catch exception when fetch from broker.
+ // If you want consumer from earliest,please use
OffsetResetStrategy.EARLIEST
+ if (offset <= 0) {
+ switch (offsetResetStrategy) {
+ case LATEST:
+ offset = consumer.maxOffset(mq);
+ log.info(
+ "current consumer thread:{} has no
committed offset,use Strategy:{} instead",
mq,
- getLong(
- props,
-
RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,
- System.currentTimeMillis()));
- break;
- default:
- throw new IllegalArgumentException(
- "Unknown value for CONSUMER_OFFSET_RESET_TO.");
- }
+ offsetResetStrategy);
+ break;
+ case EARLIEST:
+ log.info(
+ "current consumer thread:{} has no
committed offset,use Strategy:{} instead",
+ mq,
+ offsetResetStrategy);
+ offset = consumer.minOffset(mq);
+ break;
+ default:
+ break;
+ }
+ }
+ break;
+ case TIMESTAMP:
+ offset = consumer.searchOffset(mq, specificTimeStamp);
+ break;
+ case SPECIFIC_OFFSETS:
+ if (specificStartupOffsets == null) {
+ throw new RuntimeException(
+ "StartMode is specific_offsets.But none
offsets has been specified");
+ }
+ Long specificOffset = specificStartupOffsets.get(mq);
+ if (specificOffset != null) {
+ offset = specificOffset;
+ } else {
+ offset = consumer.fetchConsumeOffset(mq, false);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "current startMode is not supported" + startMode);
}
- } else {
- offset = startMessageOffset;
+ log.info(
+ "current consumer queue:{} start from offset of: {}",
+ mq.getBrokerName() + "-" + mq.getQueueId(),
+ offset);
+ offsetTable.put(mq, offset);
}
- offsetTable.put(mq, offset);
- return offsetTable.get(mq);
+ }
+
+ /** consume from the min offset at every restart with no state */
+ public RocketMQSourceFunction<OUT> setStartFromEarliest() {
+ this.startMode = StartupMode.EARLIEST;
+ return this;
+ }
+
+ /** consume from the max offset of each broker's queue at every restart
with no state */
+ public RocketMQSourceFunction<OUT> setStartFromLatest() {
+ this.startMode = StartupMode.LATEST;
+ return this;
+ }
+
+ /** consume from the closest offset */
+ public RocketMQSourceFunction<OUT> setStartFromTimeStamp(long timeStamp) {
+ this.startMode = StartupMode.TIMESTAMP;
+ this.specificTimeStamp = timeStamp;
+ return this;
+ }
+
+ /** consume from the group offsets those was stored in brokers. */
+ public RocketMQSourceFunction<OUT> setStartFromGroupOffsets() {
+ this.startMode = StartupMode.GROUP_OFFSETS;
+ return this;
+ }
+
+ /**
+ * consume from the group offsets those was stored in brokers. If there is
no committed
+ * offset,#{@link OffsetResetStrategy} would provide initialization policy.
+ */
+ public RocketMQSourceFunction<OUT> setStartFromGroupOffsets(
+ OffsetResetStrategy offsetResetStrategy) {
+ this.startMode = StartupMode.GROUP_OFFSETS;
+ this.offsetResetStrategy = offsetResetStrategy;
+ return this;
+ }
+
+ /**
+ * consume from the specific offset. Group offsets is enable while the
broker didn't specify
+ * offset.
+ */
+ public RocketMQSourceFunction<OUT> setStartFromSpecificOffsets(
+ Map<MessageQueue, Long> specificOffsets) {
+ this.specificStartupOffsets = specificOffsets;
+ this.startMode = StartupMode.SPECIFIC_OFFSETS;
+ return this;
}
private void updateMessageQueueOffset(MessageQueue mq, long offset) throws
MQClientException {
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
similarity index 65%
copy from
src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
copy to
src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
index 70c26d9..a48e6d4 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
@@ -14,18 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.flink.legacy.common.util;
-import java.lang.reflect.Field;
+package org.apache.rocketmq.flink.legacy.common.config;
-public class TestUtils {
- public static void setFieldValue(Object obj, String fieldName, Object
value) {
- try {
- Field field = obj.getClass().getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(obj, value);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+/** Config for #{@link StartupMode#GROUP_OFFSETS}. */
+public enum OffsetResetStrategy {
+ /** If group offsets is not found,the latest offset would be set to start
consumer */
+ LATEST,
+ /** If group offsets is not found,the earliest offset would be set to
start consumer */
+ EARLIEST
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
similarity index 65%
copy from
src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
copy to
src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
index 70c26d9..163dae4 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
@@ -14,18 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.flink.legacy.common.util;
-import java.lang.reflect.Field;
+package org.apache.rocketmq.flink.legacy.common.config;
-public class TestUtils {
- public static void setFieldValue(Object obj, String fieldName, Object
value) {
- try {
- Field field = obj.getClass().getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(obj, value);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+/** RocketMQ startup mode. */
+public enum StartupMode {
+ EARLIEST,
+ LATEST,
+ GROUP_OFFSETS,
+ TIMESTAMP,
+ SPECIFIC_OFFSETS
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
new file mode 100644
index 0000000..f59b961
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+
+/** deserialize the message body to string */
+public class SimpleStringDeserializationSchema implements
KeyValueDeserializationSchema<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String deserializeKeyAndValue(byte[] key, byte[] value) {
+ String v = value != null ? new String(value, StandardCharsets.UTF_8) :
null;
+ return v;
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeInformation.of(String.class);
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
index 6c62cc4..17b4dda 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -23,10 +23,15 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
+import java.io.Serializable;
+
/** RocketMQ connector metrics. */
public class MetricUtils {
public static final String METRICS_TPS = "tps";
+ //
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
+ public static final String CURRENT_FETCH_EVENT_TIME_LAG =
"currentFetchEventTimeLag";
+ public static final String CURRENT_EMIT_EVENT_TIME_LAG =
"currentEmitEventTimeLag";
private static final String METRIC_GROUP_SINK = "sink";
private static final String METRICS_SINK_IN_TPS = "inTps";
@@ -84,4 +89,17 @@ public class MetricUtils {
return value;
}
}
+
+ public static class TimestampGauge implements Gauge<Long>, Serializable {
+ private Long value;
+
+ public void report(long delay) {
+ this.value = delay;
+ }
+
+ @Override
+ public Long getValue() {
+ return value;
+ }
+ }
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
index 70c26d9..00a24c9 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -28,4 +28,11 @@ public class TestUtils {
e.printStackTrace();
}
}
+
+ public static Object getFieldValue(Object obj, String fieldName)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(obj);
+ }
}
diff --git
a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
new file mode 100644
index 0000000..8b7b44f
--- /dev/null
+++
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.sourceFunction;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
+import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
+import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
+import
org.apache.rocketmq.flink.legacy.common.serialization.SimpleStringDeserializationSchema;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
+import static
org.apache.rocketmq.flink.legacy.common.util.TestUtils.getFieldValue;
+import static
org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link RocketMQSourceFunction}. */
+public class RocketMQSourceFunctionTest {
+
+ @Test
+ public void testSetStartupMode() throws NoSuchFieldException,
IllegalAccessException {
+ RocketMQSourceFunction<String> source =
+ new RocketMQSourceFunction<>(
+ new SimpleStringDeserializationSchema(), new
Properties());
+ assertEquals(StartupMode.GROUP_OFFSETS, getFieldValue(source,
"startMode"));
+ source.setStartFromEarliest();
+ assertEquals(StartupMode.EARLIEST, getFieldValue(source, "startMode"));
+ source.setStartFromLatest();
+ assertEquals(StartupMode.LATEST, getFieldValue(source, "startMode"));
+ source.setStartFromTimeStamp(0L);
+ assertEquals(StartupMode.TIMESTAMP, getFieldValue(source,
"startMode"));
+ source.setStartFromSpecificOffsets(null);
+ assertEquals(StartupMode.SPECIFIC_OFFSETS, getFieldValue(source,
"startMode"));
+ source.setStartFromGroupOffsets();
+ assertEquals(StartupMode.GROUP_OFFSETS, getFieldValue(source,
"startMode"));
+ assertEquals(OffsetResetStrategy.LATEST, getFieldValue(source,
"offsetResetStrategy"));
+ source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
+ assertEquals(OffsetResetStrategy.EARLIEST, getFieldValue(source,
"offsetResetStrategy"));
+ }
+
+ @Test
+ public void testRestartFromCheckpoint() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(RocketMQConfig.CONSUMER_GROUP,
"${ConsumerGroup}");
+ properties.setProperty(RocketMQConfig.CONSUMER_TOPIC,
"${SourceTopic}");
+ properties.setProperty(RocketMQConfig.CONSUMER_TAG,
DEFAULT_CONSUMER_TAG);
+ RocketMQSourceFunction<String> source =
+ new RocketMQSourceFunction<>(new
SimpleStringDeserializationSchema(), properties);
+ source.setStartFromLatest();
+ setFieldValue(source, "restored", true);
+ HashMap<MessageQueue, Long> map = new HashMap<>();
+ map.put(new MessageQueue("tpc", "broker-0", 0), 20L);
+ map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
+ map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
+ map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+ setFieldValue(source, "restoredOffsets", map);
+ setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
+ source.initOffsetTableFromRestoredOffsets();
+ Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source,
"offsetTable");
+ for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
+ assertEquals(offsetTable.containsKey(entry.getKey()), true);
+ assertEquals(offsetTable.containsValue(entry.getValue()), true);
+ }
+ }
+}