This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e27ff8  [ISSUE #12] Support scan startup mode (#54)
0e27ff8 is described below

commit 0e27ff8a9a3e0f7fba4c71c0cfa3daa00cf9829f
Author: Yubin Li <[email protected]>
AuthorDate: Tue Aug 30 08:39:54 2022 +0800

    [ISSUE #12] Support scan startup mode (#54)
---
 .../rocketmq/flink/common/RocketMQOptions.java     |  6 ++
 .../rocketmq/flink/source/RocketMQSource.java      | 18 ++++--
 .../enumerator/RocketMQSourceEnumerator.java       | 71 +++++++++++++++++-----
 .../reader/RocketMQPartitionSplitReader.java       |  2 +-
 .../table/RocketMQDynamicTableSourceFactory.java   | 25 ++++++++
 .../source/table/RocketMQScanTableSource.java      | 13 +++-
 6 files changed, 114 insertions(+), 21 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java 
b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index dfdef29..22903c5 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -108,4 +108,10 @@ public class RocketMQOptions {
 
     public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
             ConfigOptions.key("secretKey").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_MODE =
+            
ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest");
+
+    public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
+            
ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index 8c804cf..27c69f1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -58,6 +58,8 @@ public class RocketMQSource<OUT>
                 ResultTypeQueryable<OUT> {
     private static final long serialVersionUID = -1L;
 
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
     private final String topic;
     private final String consumerGroup;
     private final String nameServerAddress;
@@ -89,7 +91,9 @@ public class RocketMQSource<OUT>
             long startOffset,
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
-            RocketMQDeserializationSchema<OUT> deserializationSchema) {
+            RocketMQDeserializationSchema<OUT> deserializationSchema,
+            String cosumerOffsetMode,
+            long consumerOffsetTimestamp) {
         Validate.isTrue(
                 !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
                 "Consumer tag and sql can not set value at the same time");
@@ -102,10 +106,12 @@ public class RocketMQSource<OUT>
         this.sql = sql;
         this.stopInMs = stopInMs;
         this.startTime = startTime;
-        this.startOffset = startOffset;
+        this.startOffset = startOffset > 0 ? startOffset : startTime;
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
         this.boundedness = boundedness;
         this.deserializationSchema = deserializationSchema;
+        this.consumerOffsetMode = cosumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -169,7 +175,9 @@ public class RocketMQSource<OUT>
                 startOffset,
                 partitionDiscoveryIntervalMs,
                 boundedness,
-                enumContext);
+                enumContext,
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     @Override
@@ -188,7 +196,9 @@ public class RocketMQSource<OUT>
                 partitionDiscoveryIntervalMs,
                 boundedness,
                 enumContext,
-                checkpoint.getCurrentAssignment());
+                checkpoint.getCurrentAssignment(),
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
 
b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index db593ec..38aa132 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -49,7 +49,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
 
 /** The enumerator class for RocketMQ source. */
 @Internal
@@ -57,7 +60,9 @@ public class RocketMQSourceEnumerator
         implements SplitEnumerator<RocketMQPartitionSplit, 
RocketMQSourceEnumState> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
-
+    private final Map<MessageQueue, Long> offsetTable = new HashMap<>();
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
     /** The topic used for this RocketMQSource. */
     private final String topic;
     /** The consumer group used for this RocketMQSource. */
@@ -108,7 +113,9 @@ public class RocketMQSourceEnumerator
             long startOffset,
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
-            SplitEnumeratorContext<RocketMQPartitionSplit> context) {
+            SplitEnumeratorContext<RocketMQPartitionSplit> context,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp) {
         this(
                 topic,
                 consumerGroup,
@@ -120,7 +127,9 @@ public class RocketMQSourceEnumerator
                 partitionDiscoveryIntervalMs,
                 boundedness,
                 context,
-                new HashMap<>());
+                new HashMap<>(),
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     public RocketMQSourceEnumerator(
@@ -134,7 +143,9 @@ public class RocketMQSourceEnumerator
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
             SplitEnumeratorContext<RocketMQPartitionSplit> context,
-            Map<Integer, List<RocketMQPartitionSplit>> 
currentSplitsAssignments) {
+            Map<Integer, List<RocketMQPartitionSplit>> 
currentSplitsAssignments,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp) {
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
@@ -158,6 +169,8 @@ public class RocketMQSourceEnumerator
                                                         s.getBroker(),
                                                         s.getPartition()))));
         this.pendingPartitionSplitAssignment = new HashMap<>();
+        this.consumerOffsetMode = consumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -221,6 +234,7 @@ public class RocketMQSourceEnumerator
         Set<Tuple3<String, String, Integer>> removedPartitions =
                 new 
HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
         Set<MessageQueue> messageQueues = 
consumer.fetchSubscribeMessageQueues(topic);
+        Set<RocketMQPartitionSplit> result = new HashSet<>();
         for (MessageQueue messageQueue : messageQueues) {
             Tuple3<String, String, Integer> topicPartition =
                     new Tuple3<>(
@@ -229,19 +243,17 @@ public class RocketMQSourceEnumerator
                             messageQueue.getQueueId());
             if (!removedPartitions.remove(topicPartition)) {
                 newPartitions.add(topicPartition);
+                result.add(
+                        new RocketMQPartitionSplit(
+                                topicPartition.f0,
+                                topicPartition.f1,
+                                topicPartition.f2,
+                                getOffsetByMessageQueue(messageQueue),
+                                stopInMs));
             }
         }
         
discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
-        return newPartitions.stream()
-                .map(
-                        messageQueue ->
-                                new RocketMQPartitionSplit(
-                                        messageQueue.f0,
-                                        messageQueue.f1,
-                                        messageQueue.f2,
-                                        startOffset,
-                                        stopInMs))
-                .collect(Collectors.toSet());
+        return result;
     }
 
     // This method should only be invoked in the coordinator executor thread.
@@ -317,6 +329,35 @@ public class RocketMQSourceEnumerator
                 });
     }
 
+    private long getOffsetByMessageQueue(MessageQueue mq) throws 
MQClientException {
+        Long offset = offsetTable.get(mq);
+        if (offset == null) {
+            if (startOffset > 0) {
+                offset = startOffset;
+            } else {
+                switch (consumerOffsetMode) {
+                    case CONSUMER_OFFSET_EARLIEST:
+                        offset = consumer.minOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_LATEST:
+                        offset = consumer.maxOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_TIMESTAMP:
+                        offset = consumer.searchOffset(mq, 
consumerOffsetTimestamp);
+                        break;
+                    default:
+                        offset = consumer.fetchConsumeOffset(mq, false);
+                        if (offset < 0) {
+                            throw new IllegalArgumentException(
+                                    "Unknown value for 
CONSUMER_OFFSET_RESET_TO.");
+                        }
+                }
+            }
+        }
+        offsetTable.put(mq, offset);
+        return offsetTable.get(mq);
+    }
+
     private void initialRocketMQConsumer() {
         try {
             if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
 
b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index ad25f0e..ca9c3f1 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -266,7 +266,7 @@ public class RocketMQPartitionSplitReader<T>
                             "The SplitChange type of %s is not supported.",
                             splitsChange.getClass()));
         }
-        // Setup the stopping timestamps.
+        // Set up the stopping timestamps.
         splitsChange
                 .splits()
                 .forEach(
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 1c66dab..6db5075 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.rocketmq.flink.source.table;
 
+import org.apache.rocketmq.flink.common.RocketMQOptions;
+
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
@@ -48,6 +50,7 @@ import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DE
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
@@ -57,6 +60,7 @@ import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
 import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC;
+import static 
org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
 
 /**
  * Defines the {@link DynamicTableSourceFactory} implementation to create 
{@link
@@ -99,6 +103,7 @@ public class RocketMQDynamicTableSourceFactory implements 
DynamicTableSourceFact
         optionalOptions.add(OPTIONAL_LENGTH_CHECK);
         optionalOptions.add(OPTIONAL_ACCESS_KEY);
         optionalOptions.add(OPTIONAL_SECRET_KEY);
+        optionalOptions.add(OPTIONAL_SCAN_STARTUP_MODE);
         return optionalOptions;
     }
 
@@ -113,6 +118,18 @@ public class RocketMQDynamicTableSourceFactory implements 
DynamicTableSourceFact
         String nameServerAddress = 
configuration.getString(NAME_SERVER_ADDRESS);
         String tag = configuration.getString(OPTIONAL_TAG);
         String sql = configuration.getString(OPTIONAL_SQL);
+        if (configuration.contains(OPTIONAL_SCAN_STARTUP_MODE)
+                && (configuration.contains(OPTIONAL_START_MESSAGE_OFFSET)
+                        || configuration.contains(OPTIONAL_START_TIME_MILLS)
+                        || configuration.contains(OPTIONAL_START_TIME))) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "cannot support these configs when %s has been 
set: [%s, %s, %s] !",
+                            OPTIONAL_SCAN_STARTUP_MODE.key(),
+                            OPTIONAL_START_MESSAGE_OFFSET.key(),
+                            OPTIONAL_START_TIME.key(),
+                            OPTIONAL_START_TIME_MILLS.key()));
+        }
         long startMessageOffset = 
configuration.getLong(OPTIONAL_START_MESSAGE_OFFSET);
         long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
         String startDateTime = configuration.getString(OPTIONAL_START_TIME);
@@ -158,6 +175,12 @@ public class RocketMQDynamicTableSourceFactory implements 
DynamicTableSourceFact
         TableSchema physicalSchema =
                 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         descriptorProperties.putTableSchema("schema", physicalSchema);
+        String consumerOffsetMode =
+                configuration.getString(
+                        RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE, 
CONSUMER_OFFSET_LATEST);
+        long consumerOffsetTimestamp =
+                configuration.getLong(
+                        RocketMQOptions.OPTIONAL_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis());
         return new RocketMQScanTableSource(
                 descriptorProperties,
                 physicalSchema,
@@ -172,6 +195,8 @@ public class RocketMQDynamicTableSourceFactory implements 
DynamicTableSourceFact
                 startMessageOffset,
                 startMessageOffset < 0 ? startTime : -1L,
                 partitionDiscoveryIntervalMs,
+                consumerOffsetMode,
+                consumerOffsetTimestamp,
                 useNewApi);
     }
 
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 77420e5..dc92a47 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -56,6 +56,9 @@ public class RocketMQScanTableSource implements 
ScanTableSource, SupportsReading
     private final DescriptorProperties properties;
     private final TableSchema schema;
 
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
+
     private final String topic;
     private final String consumerGroup;
     private final String nameServerAddress;
@@ -87,6 +90,8 @@ public class RocketMQScanTableSource implements 
ScanTableSource, SupportsReading
             long startMessageOffset,
             long startTime,
             long partitionDiscoveryIntervalMs,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp,
             boolean useNewApi) {
         this.properties = properties;
         this.schema = schema;
@@ -103,6 +108,8 @@ public class RocketMQScanTableSource implements 
ScanTableSource, SupportsReading
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
         this.useNewApi = useNewApi;
         this.metadataKeys = Collections.emptyList();
+        this.consumerOffsetMode = consumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -127,7 +134,9 @@ public class RocketMQScanTableSource implements 
ScanTableSource, SupportsReading
                             startMessageOffset < 0 ? 0 : startMessageOffset,
                             partitionDiscoveryIntervalMs,
                             isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
-                            createRocketMQDeserializationSchema()));
+                            createRocketMQDeserializationSchema(),
+                            consumerOffsetMode,
+                            consumerOffsetTimestamp));
         } else {
             return SourceFunctionProvider.of(
                     new RocketMQSourceFunction<>(
@@ -166,6 +175,8 @@ public class RocketMQScanTableSource implements 
ScanTableSource, SupportsReading
                         startMessageOffset,
                         startTime,
                         partitionDiscoveryIntervalMs,
+                        consumerOffsetMode,
+                        consumerOffsetTimestamp,
                         useNewApi);
         tableSource.metadataKeys = metadataKeys;
         return tableSource;

Reply via email to