This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new e591455  [FLINK-24087][connector-kafka] Avoid importing Table API 
classes for DataStream API programs
e591455 is described below

commit e591455f4d6a5b93899069f323a5947dc5958b03
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Aug 31 23:39:37 2021 +0800

    [FLINK-24087][connector-kafka] Avoid importing Table API classes for 
DataStream API programs
    
    This closes #17082.
---
 .../connectors/kafka/config/StartupMode.java       | 21 ------------------
 .../kafka/table/KafkaConnectorOptionsUtil.java     | 25 +++++++++++++++++++++-
 2 files changed, 24 insertions(+), 22 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 2efe0d2..0aa43ec 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka.config;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
-import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
-import org.apache.flink.table.api.TableException;
 
 /** Startup modes for the Kafka Consumer. */
 @Internal
@@ -58,25 +56,6 @@ public enum StartupMode {
         this.stateSentinel = stateSentinel;
     }
 
-    public static StartupMode fromOption(ScanStartupMode scanStartupMode) {
-        switch (scanStartupMode) {
-            case EARLIEST_OFFSET:
-                return StartupMode.EARLIEST;
-            case LATEST_OFFSET:
-                return StartupMode.LATEST;
-            case GROUP_OFFSETS:
-                return StartupMode.GROUP_OFFSETS;
-            case SPECIFIC_OFFSETS:
-                return StartupMode.SPECIFIC_OFFSETS;
-            case TIMESTAMP:
-                return StartupMode.TIMESTAMP;
-
-            default:
-                throw new TableException(
-                        "Unsupported startup mode. Validator should have 
checked that.");
-        }
-    }
-
     public long getStateSentinel() {
         return stateSentinel;
     }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index c8ea244..5514a5c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -224,7 +224,7 @@ class KafkaConnectorOptionsUtil {
         final StartupMode startupMode =
                 tableOptions
                         .getOptional(SCAN_STARTUP_MODE)
-                        .map(StartupMode::fromOption)
+                        .map(KafkaConnectorOptionsUtil::fromOption)
                         .orElse(StartupMode.GROUP_OFFSETS);
         if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
             // It will be refactored after support specific offset for 
multiple topics in
@@ -257,6 +257,29 @@ class KafkaConnectorOptionsUtil {
                 });
     }
 
+    /**
+     * Returns the {@link StartupMode} of Kafka Consumer by passed-in 
table-specific {@link
+     * ScanStartupMode}.
+     */
+    private static StartupMode fromOption(ScanStartupMode scanStartupMode) {
+        switch (scanStartupMode) {
+            case EARLIEST_OFFSET:
+                return StartupMode.EARLIEST;
+            case LATEST_OFFSET:
+                return StartupMode.LATEST;
+            case GROUP_OFFSETS:
+                return StartupMode.GROUP_OFFSETS;
+            case SPECIFIC_OFFSETS:
+                return StartupMode.SPECIFIC_OFFSETS;
+            case TIMESTAMP:
+                return StartupMode.TIMESTAMP;
+
+            default:
+                throw new TableException(
+                        "Unsupported startup mode. Validator should have 
checked that.");
+        }
+    }
+
     public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
         final Properties kafkaProperties = new Properties();
 

Reply via email to