This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new e8d5350  [ISSUE #59] Support rich initialization modes of RocketMQ 
source (#62)
e8d5350 is described below

commit e8d535099ccc1c999e13b120d5de119df999a724
Author: 高思伟 <[email protected]>
AuthorDate: Mon Oct 10 15:09:14 2022 +0800

    [ISSUE #59] Support rich initialization modes of RocketMQ source (#62)
    
    1.Add rich initialization modes those are similar to kafka-connector
    2.Add metrics in source phase
    
    Co-authored-by: 高思伟 <[email protected]>
---
 README.md                                          |  38 +++-
 pom.xml                                            |  14 ++
 .../flink/legacy/RocketMQSourceFunction.java       | 232 +++++++++++++++------
 .../OffsetResetStrategy.java}                      |  19 +-
 .../TestUtils.java => config/StartupMode.java}     |  20 +-
 .../SimpleStringDeserializationSchema.java         |  39 ++++
 .../flink/legacy/common/util/MetricUtils.java      |  18 ++
 .../flink/legacy/common/util/TestUtils.java        |   7 +
 .../sourceFunction/RocketMQSourceFunctionTest.java |  88 ++++++++
 9 files changed, 387 insertions(+), 88 deletions(-)

diff --git a/README.md b/README.md
index fa53dd7..be889eb 100644
--- a/README.md
+++ b/README.md
@@ -73,7 +73,12 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
"localhost:9876");
 
-        env.addSource(new RocketMQSourceFunction(new 
SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
+        RocketMQSourceFunction<Map<Object,Object>> source = new 
RocketMQSourceFunction(
+                new SimpleKeyValueDeserializationSchema("id", "address"), 
consumerProps);
+        // use group offsets.
+        // If there is no committed offset,consumer would start from the 
latest offset.
+        source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
+        env.addSource()
             .name("rocketmq-source")
             .setParallelism(2)
             .process(new ProcessFunction<Map, Map>() {
@@ -123,13 +128,40 @@ The following configurations are all from the class 
`org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
 | consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
 
+### Consumer Strategy
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 
201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 
123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                    | DESCRIPTION                                    
              |
+| --------------------------- | 
------------------------------------------------------------ |
+| EARLIEST                    | consume from the earliest offset after restart 
with no state |
+| LATEST                      | consume from the latest offset after restart 
with no state   |
+| TIMESTAMP                   | consume from the closest timestamp of data in 
each broker's queue |
+| GROUP_OFFSETS with LATEST   | If broker has the committed offset then 
consume from the next else consume from the latest offset |
+| GROUP_OFFSETS with EARLIEST | If broker has the committed offset ,consume 
from the next ,otherwise consume from the earlist offset.It's useful when 
server  expand  broker |
+| SPECIFIC_OFFSETS            | consumer from the specificOffsets in broker's 
queues.Group offsets will be returned from those broker's queues whose didn't 
be specified |
+
+**Attention**
+
+Only if Flink job starts with none state, these strategies are effective. If 
the job recovers from the checkpoint, the offset would intialize from the 
stored data.
 
 ## RocketMQ SQL Connector
 
diff --git a/pom.xml b/pom.xml
index 1b725e1..8617bd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,6 +315,20 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>jar-no-fork</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index db2ab15..b078056 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -23,6 +23,8 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
+import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
 import 
org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
 import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
 import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
@@ -76,13 +78,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
-import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
-import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
-import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
 import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
-import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;
 import static 
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
-import static 
org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
 
 /**
  * The RocketMQSource is based on RocketMQ pull consumer mode, and provides 
exactly once reliability
@@ -116,12 +113,33 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
     private Properties props;
     private String topic;
     private String group;
-    private long startMessageOffset;
     private transient volatile boolean restored;
     private transient boolean enableCheckpoint;
     private volatile Object checkPointLock;
 
     private Meter tpsMetric;
+    private MetricUtils.TimestampGauge fetchDelay = new 
MetricUtils.TimestampGauge();
+    private MetricUtils.TimestampGauge emitDelay = new 
MetricUtils.TimestampGauge();
+
+    /** The startup mode for the consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
+    private StartupMode startMode = StartupMode.GROUP_OFFSETS;
+
+    /**
+     * If StartupMode#GROUP_OFFSETS has no commit offset.OffsetResetStrategy 
would offer init
+     * strategy.
+     */
+    private OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.LATEST;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    private Map<MessageQueue, Long> specificStartupOffsets;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#TIMESTAMP}.
+     */
+    private long specificTimeStamp;
 
     public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, 
Properties props) {
         this.schema = schema;
@@ -135,11 +153,6 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
 
         this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
         this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
-        this.startMessageOffset =
-                props.containsKey(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET)
-                        ? Long.parseLong(
-                                
props.getProperty(RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET))
-                        : DEFAULT_START_MESSAGE_OFFSET;
 
         Validate.notEmpty(topic, "Consumer topic can not be empty");
         Validate.notEmpty(group, "Consumer group can not be empty");
@@ -214,14 +227,13 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                 getRuntimeContext()
                         .getMetricGroup()
                         .meter(MetricUtils.METRICS_TPS, new 
MeterView(outputCounter, 60));
-    }
 
-    @Override
-    public void run(SourceContext context) throws Exception {
-        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
-        String tag =
-                props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
-        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, 
DEFAULT_CONSUMER_BATCH_SIZE);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
 
         final RuntimeContext ctx = getRuntimeContext();
         // The lock that guarantees that record emission and state updates are 
atomic,
@@ -229,7 +241,22 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         int taskNumber = ctx.getNumberOfParallelSubtasks();
         int taskIndex = ctx.getIndexOfThisSubtask();
         log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", 
taskNumber, taskIndex);
+        Collection<MessageQueue> totalQueues = 
consumer.fetchSubscribeMessageQueues(topic);
+        messageQueues =
+                RocketMQUtils.allocate(totalQueues, taskNumber, 
ctx.getIndexOfThisSubtask());
+        // If the job recovers from the state, the state has already contained 
the offsets of last
+        // commit.
+        if (!restored) {
+            initOffsets(messageQueues);
+        }
+    }
 
+    @Override
+    public void run(SourceContext context) throws Exception {
+        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
+        String tag =
+                props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
+        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, 
DEFAULT_CONSUMER_BATCH_SIZE);
         timer.scheduleAtFixedRate(
                 () -> {
                     // 
context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
@@ -238,10 +265,6 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                 5,
                 5,
                 TimeUnit.SECONDS);
-
-        Collection<MessageQueue> totalQueues = 
consumer.fetchSubscribeMessageQueues(topic);
-        messageQueues =
-                RocketMQUtils.allocate(totalQueues, taskNumber, 
ctx.getIndexOfThisSubtask());
         for (MessageQueue mq : messageQueues) {
             this.executor.execute(
                     () ->
@@ -249,8 +272,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                                     () -> {
                                         while (runningChecker.isRunning()) {
                                             try {
-                                                long offset = 
getMessageQueueOffset(mq);
-
+                                                long offset = 
offsetTable.get(mq);
                                                 PullResult pullResult;
                                                 if (StringUtils.isEmpty(sql)) {
                                                     pullResult =
@@ -271,6 +293,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                                                     case FOUND:
                                                         List<MessageExt> 
messages =
                                                                 
pullResult.getMsgFoundList();
+                                                        long fetchTime = 
System.currentTimeMillis();
                                                         for (MessageExt msg : 
messages) {
                                                             byte[] key =
                                                                     
msg.getKeys() != null
@@ -299,12 +322,24 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                                                                 
context.collectWithTimestamp(
                                                                         data,
                                                                         
msg.getBornTimestamp());
+                                                                long emitTime =
+                                                                        
System.currentTimeMillis();
 
                                                                 // update max 
eventTime per queue
                                                                 // 
waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
                                                                 
waterMarkForAll.extractTimestamp(
                                                                         
msg.getBornTimestamp());
                                                                 
tpsMetric.markEvent();
+                                                                long eventTime 
=
+                                                                        
msg.getStoreTimestamp();
+                                                                
fetchDelay.report(
+                                                                        
Math.abs(
+                                                                               
 fetchTime
+                                                                               
         - eventTime));
+                                                                
emitDelay.report(
+                                                                        
Math.abs(
+                                                                               
 emitTime
+                                                                               
         - eventTime));
                                                             }
                                                         }
                                                         found = true;
@@ -359,46 +394,121 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         }
     }
 
-    private long getMessageQueueOffset(MessageQueue mq) throws 
MQClientException {
-        Long offset = offsetTable.get(mq);
-        if (offset != null) {
-            return offset;
-        }
-        // restoredOffsets(unionOffsetStates) is the restored global union 
state;
-        // should only snapshot mqs that actually belong to us
-        if (startMessageOffset == DEFAULT_START_MESSAGE_OFFSET) {
-            // fetchConsumeOffset from broker
-            offset = consumer.fetchConsumeOffset(mq, false);
-            if (!restored && offset < 0) {
-                String initialOffset =
-                        props.getProperty(
-                                RocketMQConfig.CONSUMER_OFFSET_RESET_TO, 
CONSUMER_OFFSET_LATEST);
-                switch (initialOffset) {
-                    case CONSUMER_OFFSET_EARLIEST:
-                        offset = consumer.minOffset(mq);
-                        break;
-                    case CONSUMER_OFFSET_LATEST:
-                        offset = consumer.maxOffset(mq);
-                        break;
-                    case CONSUMER_OFFSET_TIMESTAMP:
-                        offset =
-                                consumer.searchOffset(
+    /**
+     * only flink job start with no state can init offsets from broker
+     *
+     * @param messageQueues
+     * @throws MQClientException
+     */
+    private void initOffsets(List<MessageQueue> messageQueues) throws 
MQClientException {
+        for (MessageQueue mq : messageQueues) {
+            long offset;
+            switch (startMode) {
+                case LATEST:
+                    offset = consumer.maxOffset(mq);
+                    break;
+                case EARLIEST:
+                    offset = consumer.minOffset(mq);
+                    break;
+                case GROUP_OFFSETS:
+                    offset = consumer.fetchConsumeOffset(mq, false);
+                    // the min offset return if consumer group first 
join,return a negative number
+                    // if
+                    // catch exception when fetch from broker.
+                    // If you want consumer from earliest,please use 
OffsetResetStrategy.EARLIEST
+                    if (offset <= 0) {
+                        switch (offsetResetStrategy) {
+                            case LATEST:
+                                offset = consumer.maxOffset(mq);
+                                log.info(
+                                        "current consumer thread:{} has no 
committed offset,use Strategy:{} instead",
                                         mq,
-                                        getLong(
-                                                props,
-                                                
RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,
-                                                System.currentTimeMillis()));
-                        break;
-                    default:
-                        throw new IllegalArgumentException(
-                                "Unknown value for CONSUMER_OFFSET_RESET_TO.");
-                }
+                                        offsetResetStrategy);
+                                break;
+                            case EARLIEST:
+                                log.info(
+                                        "current consumer thread:{} has no 
committed offset,use Strategy:{} instead",
+                                        mq,
+                                        offsetResetStrategy);
+                                offset = consumer.minOffset(mq);
+                                break;
+                            default:
+                                break;
+                        }
+                    }
+                    break;
+                case TIMESTAMP:
+                    offset = consumer.searchOffset(mq, specificTimeStamp);
+                    break;
+                case SPECIFIC_OFFSETS:
+                    if (specificStartupOffsets == null) {
+                        throw new RuntimeException(
+                                "StartMode is specific_offsets.But none 
offsets has been specified");
+                    }
+                    Long specificOffset = specificStartupOffsets.get(mq);
+                    if (specificOffset != null) {
+                        offset = specificOffset;
+                    } else {
+                        offset = consumer.fetchConsumeOffset(mq, false);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            "current startMode is not supported" + startMode);
             }
-        } else {
-            offset = startMessageOffset;
+            log.info(
+                    "current consumer queue:{} start from offset of: {}",
+                    mq.getBrokerName() + "-" + mq.getQueueId(),
+                    offset);
+            offsetTable.put(mq, offset);
         }
-        offsetTable.put(mq, offset);
-        return offsetTable.get(mq);
+    }
+
+    /** consume from the min offset at every restart with no state */
+    public RocketMQSourceFunction<OUT> setStartFromEarliest() {
+        this.startMode = StartupMode.EARLIEST;
+        return this;
+    }
+
+    /** consume from the max offset of each broker's queue at every restart 
with no state */
+    public RocketMQSourceFunction<OUT> setStartFromLatest() {
+        this.startMode = StartupMode.LATEST;
+        return this;
+    }
+
+    /** consume from the closest offset */
+    public RocketMQSourceFunction<OUT> setStartFromTimeStamp(long timeStamp) {
+        this.startMode = StartupMode.TIMESTAMP;
+        this.specificTimeStamp = timeStamp;
+        return this;
+    }
+
+    /** consume from the group offsets those was stored in brokers. */
+    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets() {
+        this.startMode = StartupMode.GROUP_OFFSETS;
+        return this;
+    }
+
+    /**
+     * consume from the group offsets those was stored in brokers. If there is 
no committed
+     * offset,#{@link OffsetResetStrategy} would provide initialization policy.
+     */
+    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets(
+            OffsetResetStrategy offsetResetStrategy) {
+        this.startMode = StartupMode.GROUP_OFFSETS;
+        this.offsetResetStrategy = offsetResetStrategy;
+        return this;
+    }
+
+    /**
+     * consume from the specific offset. Group offsets is enable while the 
broker didn't specify
+     * offset.
+     */
+    public RocketMQSourceFunction<OUT> setStartFromSpecificOffsets(
+            Map<MessageQueue, Long> specificOffsets) {
+        this.specificStartupOffsets = specificOffsets;
+        this.startMode = StartupMode.SPECIFIC_OFFSETS;
+        return this;
     }
 
     private void updateMessageQueueOffset(MessageQueue mq, long offset) throws 
MQClientException {
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
similarity index 65%
copy from 
src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
copy to 
src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
index 70c26d9..a48e6d4 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/OffsetResetStrategy.java
@@ -14,18 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.flink.legacy.common.util;
 
-import java.lang.reflect.Field;
+package org.apache.rocketmq.flink.legacy.common.config;
 
-public class TestUtils {
-    public static void setFieldValue(Object obj, String fieldName, Object 
value) {
-        try {
-            Field field = obj.getClass().getDeclaredField(fieldName);
-            field.setAccessible(true);
-            field.set(obj, value);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+/** Config for #{@link StartupMode#GROUP_OFFSETS}. */
+public enum OffsetResetStrategy {
+    /** If group offsets is not found,the latest offset would be set to start 
consumer */
+    LATEST,
+    /** If group offsets is not found,the earliest offset would be set to 
start consumer */
+    EARLIEST
 }
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
similarity index 65%
copy from 
src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
copy to 
src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
index 70c26d9..163dae4 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java
@@ -14,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.flink.legacy.common.util;
 
-import java.lang.reflect.Field;
+package org.apache.rocketmq.flink.legacy.common.config;
 
-public class TestUtils {
-    public static void setFieldValue(Object obj, String fieldName, Object 
value) {
-        try {
-            Field field = obj.getClass().getDeclaredField(fieldName);
-            field.setAccessible(true);
-            field.set(obj, value);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+/** RocketMQ startup mode. */
+public enum StartupMode {
+    EARLIEST,
+    LATEST,
+    GROUP_OFFSETS,
+    TIMESTAMP,
+    SPECIFIC_OFFSETS
 }
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
new file mode 100644
index 0000000..f59b961
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleStringDeserializationSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+
+/** deserialize the message body to string */
+public class SimpleStringDeserializationSchema implements 
KeyValueDeserializationSchema<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String deserializeKeyAndValue(byte[] key, byte[] value) {
+        String v = value != null ? new String(value, StandardCharsets.UTF_8) : 
null;
+        return v;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(String.class);
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
index 6c62cc4..17b4dda 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -23,10 +23,15 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 
+import java.io.Serializable;
+
 /** RocketMQ connector metrics. */
 public class MetricUtils {
 
     public static final String METRICS_TPS = "tps";
+    // 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
+    public static final String CURRENT_FETCH_EVENT_TIME_LAG = 
"currentFetchEventTimeLag";
+    public static final String CURRENT_EMIT_EVENT_TIME_LAG = 
"currentEmitEventTimeLag";
 
     private static final String METRIC_GROUP_SINK = "sink";
     private static final String METRICS_SINK_IN_TPS = "inTps";
@@ -84,4 +89,17 @@ public class MetricUtils {
             return value;
         }
     }
+
+    public static class TimestampGauge implements Gauge<Long>, Serializable {
+        private Long value;
+
+        public void report(long delay) {
+            this.value = delay;
+        }
+
+        @Override
+        public Long getValue() {
+            return value;
+        }
+    }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
index 70c26d9..00a24c9 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -28,4 +28,11 @@ public class TestUtils {
             e.printStackTrace();
         }
     }
+
+    public static Object getFieldValue(Object obj, String fieldName)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(obj);
+    }
 }
diff --git 
a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
 
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
new file mode 100644
index 0000000..8b7b44f
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.sourceFunction;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
+import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
+import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
+import 
org.apache.rocketmq.flink.legacy.common.serialization.SimpleStringDeserializationSchema;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
+import static 
org.apache.rocketmq.flink.legacy.common.util.TestUtils.getFieldValue;
+import static 
org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link RocketMQSourceFunction}. */
+public class RocketMQSourceFunctionTest {
+
+    @Test
+    public void testSetStartupMode() throws NoSuchFieldException, 
IllegalAccessException {
+        RocketMQSourceFunction<String> source =
+                new RocketMQSourceFunction<>(
+                        new SimpleStringDeserializationSchema(), new 
Properties());
+        assertEquals(StartupMode.GROUP_OFFSETS, getFieldValue(source, 
"startMode"));
+        source.setStartFromEarliest();
+        assertEquals(StartupMode.EARLIEST, getFieldValue(source, "startMode"));
+        source.setStartFromLatest();
+        assertEquals(StartupMode.LATEST, getFieldValue(source, "startMode"));
+        source.setStartFromTimeStamp(0L);
+        assertEquals(StartupMode.TIMESTAMP, getFieldValue(source, 
"startMode"));
+        source.setStartFromSpecificOffsets(null);
+        assertEquals(StartupMode.SPECIFIC_OFFSETS, getFieldValue(source, 
"startMode"));
+        source.setStartFromGroupOffsets();
+        assertEquals(StartupMode.GROUP_OFFSETS, getFieldValue(source, 
"startMode"));
+        assertEquals(OffsetResetStrategy.LATEST, getFieldValue(source, 
"offsetResetStrategy"));
+        source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
+        assertEquals(OffsetResetStrategy.EARLIEST, getFieldValue(source, 
"offsetResetStrategy"));
+    }
+
+    @Test
+    public void testRestartFromCheckpoint() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty(RocketMQConfig.CONSUMER_GROUP, 
"${ConsumerGroup}");
+        properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, 
"${SourceTopic}");
+        properties.setProperty(RocketMQConfig.CONSUMER_TAG, 
DEFAULT_CONSUMER_TAG);
+        RocketMQSourceFunction<String> source =
+                new RocketMQSourceFunction<>(new 
SimpleStringDeserializationSchema(), properties);
+        source.setStartFromLatest();
+        setFieldValue(source, "restored", true);
+        HashMap<MessageQueue, Long> map = new HashMap<>();
+        map.put(new MessageQueue("tpc", "broker-0", 0), 20L);
+        map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
+        map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
+        map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+        setFieldValue(source, "restoredOffsets", map);
+        setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
+        source.initOffsetTableFromRestoredOffsets();
+        Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, 
"offsetTable");
+        for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
+            assertEquals(offsetTable.containsKey(entry.getKey()), true);
+            assertEquals(offsetTable.containsValue(entry.getValue()), true);
+        }
+    }
+}


Reply via email to