SteNicholas commented on a change in pull request #17994:
URL: https://github.com/apache/flink/pull/17994#discussion_r761877601



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -409,6 +415,9 @@ public int hashCode() {
                         OffsetsInitializer.timestamp(startupTimestampMillis));
                 break;
         }
+        if (setBounded != null) {
+            
kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(setBounded));

Review comment:
       The `OffsetsInitializer.offsets(setBounded)` is the 
`OffsetResetStrategy.EARLIEST` strategy, which couldn't be set as 
stoppingOffsetsInitializer.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -248,6 +248,16 @@
                             "If the delivery guarantee is configured as "
                                     + DeliveryGuarantee.EXACTLY_ONCE
                                     + " this value is used a prefix for the 
identifier of all opened Kafka transactions.");
+    // 
--------------------------------------------------------------------------------------------
+    // Source specific options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<Long> SOURCE_BOUNDED =
+            ConfigOptions.key("source.Bounded")
+                    .longType()
+                    .noDefaultValue()

Review comment:
       The default value of the boundness is `CONTINUOUS_UNBOUNDED` in 
`KafkaSourceBuilder`. Therefore does this have the default value?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -248,6 +248,16 @@
                             "If the delivery guarantee is configured as "
                                     + DeliveryGuarantee.EXACTLY_ONCE
                                     + " this value is used a prefix for the 
identifier of all opened Kafka transactions.");
+    // 
--------------------------------------------------------------------------------------------
+    // Source specific options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<Long> SOURCE_BOUNDED =

Review comment:
       `Source` interface has the `getBoundedness` public API and thus this 
option could be the common table option for all connectors. And this option 
should be the end offsets not boolean type value in your implementation.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -578,6 +588,7 @@ static void validateDeliveryGuarantee(ReadableConfig 
tableOptions) {
         public StartupMode startupMode;
         public Map<KafkaTopicPartition, Long> specificOffsets;
         public long startupTimestampMillis;
+        public Map<TopicPartition, Long> setBounded;

Review comment:
       The name of the variable could be `bounded`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to