ruanhang1993 commented on a change in pull request #17994:
URL: https://github.com/apache/flink/pull/17994#discussion_r788383759



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -151,19 +151,38 @@
                     .defaultValue(ScanStartupMode.GROUP_OFFSETS)
                     .withDescription("Startup mode for Kafka consumer.");
 
+    public static final ConfigOption<ScanEndMode> SCAN_END_MODE =
+            ConfigOptions.key("scan.end.mode")
+                    .enumType(ScanEndMode.class)
+                    .defaultValue(ScanEndMode.DEFAULT)
+                    .withDescription("End mode for Kafka consumer.");
+
     public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
             ConfigOptions.key("scan.startup.specific-offsets")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             "Optional offsets used in case of 
\"specific-offsets\" startup mode");
 
+    public static final ConfigOption<String> SCAN_END_SPECIFIC_OFFSETS =
+            ConfigOptions.key("scan.end.specific-offsets")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional offsets used in case of 
\"specific-offsets\" end mode");
+
     public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
             ConfigOptions.key("scan.startup.timestamp-millis")
                     .longType()
                     .noDefaultValue()
                     .withDescription(
-                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+                            "Optional timestamp used in case of \"timestamp\" 
startUp mode");
+
+    public static final ConfigOption<Long> SCAN_END_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.end.timestamp-millis")

Review comment:
       scan.bounded.stop-timestamp-millis

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -151,19 +151,38 @@
                     .defaultValue(ScanStartupMode.GROUP_OFFSETS)
                     .withDescription("Startup mode for Kafka consumer.");
 
+    public static final ConfigOption<ScanEndMode> SCAN_END_MODE =
+            ConfigOptions.key("scan.end.mode")

Review comment:
       scan.bounded.mode

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -151,19 +151,38 @@
                     .defaultValue(ScanStartupMode.GROUP_OFFSETS)
                     .withDescription("Startup mode for Kafka consumer.");
 
+    public static final ConfigOption<ScanEndMode> SCAN_END_MODE =
+            ConfigOptions.key("scan.end.mode")
+                    .enumType(ScanEndMode.class)
+                    .defaultValue(ScanEndMode.DEFAULT)
+                    .withDescription("End mode for Kafka consumer.");
+
     public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
             ConfigOptions.key("scan.startup.specific-offsets")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             "Optional offsets used in case of 
\"specific-offsets\" startup mode");
 
+    public static final ConfigOption<String> SCAN_END_SPECIFIC_OFFSETS =
+            ConfigOptions.key("scan.end.specific-offsets")

Review comment:
       scan.bounded.stop-offsets

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndMode.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum EndMode {
+    /** End from committed offsets in ZK / Kafka brokers of a specific 
consumer group (default). */
+    END_GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
+
+    /** End from the earliest offset possible. */
+    END_EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
+
+    /** End from the latest offset. */
+    END_LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
+
+    /**
+     * End from user-supplied timestamp for each partition. Since this mode 
will have specific
+     * offsets to end with, we do not need a sentinel value; using 
Long.MIN_VALUE as a placeholder.
+     */
+    END_TIMESTAMP(Long.MIN_VALUE),
+
+    /**
+     * End from user-supplied specific offsets for each partition. Since this 
mode will have
+     * specific offsets to end with, we do not need a sentinel value; using 
Long.MIN_VALUE as a
+     * placeholder.
+     */
+    END_SPECIFIC_OFFSETS(Long.MIN_VALUE),
+
+    DEFAULT(Long.MIN_VALUE);

Review comment:
       DEFAULT is not necessary if decide to use `noDefaultValue()`.
   Or else give a more meaningful name.

##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -318,6 +318,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case 
of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.bounded.stop-offsets</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify stop offsets for each partition in case of 
<code>'stop-offsets'</code>, e.g. 
<code>'partition:0,offset:42;partition:1,offset:300'</code>.
+    </td>

Review comment:
       need to add more doc

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -291,5 +310,36 @@ public InlineElement getDescription() {
         }
     }
 
+    /** End mode for the Kafka consumer, see {@link #SCAN_END_MODE}. */
+    public enum ScanEndMode implements DescribedEnum {
+        END_LATEST_OFFSET("end-latest-offset", text("end from the latest 
offset.")),

Review comment:
       Stop at the latest offset.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -241,6 +291,26 @@ public static StartupOptions 
getStartupOptions(ReadableConfig tableOptions) {
         return options;
     }
 
+    public static EndOptions getEndOptions(ReadableConfig tableOptions) {
+        final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+        final EndMode endMode =
+                tableOptions
+                        .getOptional(SCAN_END_MODE)
+                        .map(KafkaConnectorOptionsUtil::fromOption)
+                        .orElse(EndMode.END_GROUP_OFFSETS);

Review comment:
       The default value for the bounded mode should be unbounded. If the 
source is unbounded, there will not be specific offsets for the source.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -158,6 +158,13 @@
                     .withDescription(
                             "Optional offsets used in case of 
\"specific-offsets\" startup mode");
 
+    public static final ConfigOption<String> SCAN_BOUNDED_SPECIFIC_OFFSETS =
+            ConfigOptions.key("scan.bounded.specific-offsets")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When all partitions have reached their stop 
offsets, the source will exit");
+
     public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =

Review comment:
       > @ruanhang1993 Can you give me an example?
   
   I means that the user may not need the `latest-offset` end mode. I am not 
sure about it.
   It is of course better to provide this configuration.
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -166,6 +166,77 @@ public void testKafkaSourceSink() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test

Review comment:
       need more tests for the different settings.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndMode.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum EndMode {

Review comment:
       BoundedMode

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndMode.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum EndMode {
+    /** End from committed offsets in ZK / Kafka brokers of a specific 
consumer group (default). */
+    END_GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
+
+    /** End from the earliest offset possible. */
+    END_EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

Review comment:
       EARLIEST_OFFSET is not necessary.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -151,19 +151,38 @@
                     .defaultValue(ScanStartupMode.GROUP_OFFSETS)
                     .withDescription("Startup mode for Kafka consumer.");
 
+    public static final ConfigOption<ScanEndMode> SCAN_END_MODE =
+            ConfigOptions.key("scan.end.mode")
+                    .enumType(ScanEndMode.class)
+                    .defaultValue(ScanEndMode.DEFAULT)

Review comment:
       we could user `noDefaultValue()` here, or give it a more meaningful name.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndMode.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum EndMode {
+    /** End from committed offsets in ZK / Kafka brokers of a specific 
consumer group (default). */
+    END_GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

Review comment:
       The names don't need the prefix `END_`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -410,6 +445,29 @@ public int hashCode() {
                 break;
         }
 
+        switch (endMode) {
+            case END_LATEST:
+                kafkaSourceBuilder.setUnbounded(OffsetsInitializer.latest());
+                break;
+            case END_GROUP_OFFSETS:
+                
kafkaSourceBuilder.setUnbounded(OffsetsInitializer.committedOffsets());
+                break;
+            case END_SPECIFIC_OFFSETS:
+                Map<TopicPartition, Long> offsets = new HashMap<>();
+                specificEndOffsets.forEach(
+                        (tp, offset) ->
+                                offsets.put(
+                                        new TopicPartition(tp.getTopic(), 
tp.getPartition()),
+                                        offset));
+                
kafkaSourceBuilder.setUnbounded(OffsetsInitializer.offsets(offsets));
+                break;
+            case END_TIMESTAMP:
+                
kafkaSourceBuilder.setUnbounded(OffsetsInitializer.timestamp(endTimestampMillis));
+                break;
+            case DEFAULT:

Review comment:
       We need to do nothing by default.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -291,5 +310,36 @@ public InlineElement getDescription() {
         }
     }
 
+    /** End mode for the Kafka consumer, see {@link #SCAN_END_MODE}. */
+    public enum ScanEndMode implements DescribedEnum {

Review comment:
       ScanBoundedMode

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -410,6 +445,29 @@ public int hashCode() {
                 break;
         }
 
+        switch (endMode) {
+            case END_LATEST:
+                kafkaSourceBuilder.setUnbounded(OffsetsInitializer.latest());

Review comment:
       setBounded

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -291,5 +310,36 @@ public InlineElement getDescription() {
         }
     }
 
+    /** End mode for the Kafka consumer, see {@link #SCAN_END_MODE}. */
+    public enum ScanEndMode implements DescribedEnum {
+        END_LATEST_OFFSET("end-latest-offset", text("end from the latest 
offset.")),
+        END_GROUP_OFFSETS(
+                "end-group-offsets",
+                text(
+                        "End from committed offsets in ZooKeeper / Kafka 
brokers of a specific consumer group.")),
+        END_TIMESTAMP("timestamp", text("End from user-supplied timestamp for 
each partition.")),
+        END_SPECIFIC_OFFSETS(
+                "end-specific-offsets",
+                text("End from user-supplied specific offsets for each 
partition.")),
+        DEFAULT("default", text("Default is unbounded flow"));

Review comment:
       give a more meaningful name or remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to