This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new e591455 [FLINK-24087][connector-kafka] Avoid importing Table API
classes for DataStream API programs
e591455 is described below
commit e591455f4d6a5b93899069f323a5947dc5958b03
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Aug 31 23:39:37 2021 +0800
[FLINK-24087][connector-kafka] Avoid importing Table API classes for
DataStream API programs
This closes #17082.
---
.../connectors/kafka/config/StartupMode.java | 21 ------------------
.../kafka/table/KafkaConnectorOptionsUtil.java | 25 +++++++++++++++++++++-
2 files changed, 24 insertions(+), 22 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 2efe0d2..0aa43ec 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka.config;
import org.apache.flink.annotation.Internal;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
-import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
-import org.apache.flink.table.api.TableException;
/** Startup modes for the Kafka Consumer. */
@Internal
@@ -58,25 +56,6 @@ public enum StartupMode {
this.stateSentinel = stateSentinel;
}
- public static StartupMode fromOption(ScanStartupMode scanStartupMode) {
- switch (scanStartupMode) {
- case EARLIEST_OFFSET:
- return StartupMode.EARLIEST;
- case LATEST_OFFSET:
- return StartupMode.LATEST;
- case GROUP_OFFSETS:
- return StartupMode.GROUP_OFFSETS;
- case SPECIFIC_OFFSETS:
- return StartupMode.SPECIFIC_OFFSETS;
- case TIMESTAMP:
- return StartupMode.TIMESTAMP;
-
- default:
- throw new TableException(
- "Unsupported startup mode. Validator should have
checked that.");
- }
- }
-
public long getStateSentinel() {
return stateSentinel;
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index c8ea244..5514a5c 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -224,7 +224,7 @@ class KafkaConnectorOptionsUtil {
final StartupMode startupMode =
tableOptions
.getOptional(SCAN_STARTUP_MODE)
- .map(StartupMode::fromOption)
+ .map(KafkaConnectorOptionsUtil::fromOption)
.orElse(StartupMode.GROUP_OFFSETS);
if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
// It will be refactored after support specific offset for
multiple topics in
@@ -257,6 +257,29 @@ class KafkaConnectorOptionsUtil {
});
}
+ /**
+ * Returns the {@link StartupMode} of Kafka Consumer by passed-in
table-specific {@link
+ * ScanStartupMode}.
+ */
+ private static StartupMode fromOption(ScanStartupMode scanStartupMode) {
+ switch (scanStartupMode) {
+ case EARLIEST_OFFSET:
+ return StartupMode.EARLIEST;
+ case LATEST_OFFSET:
+ return StartupMode.LATEST;
+ case GROUP_OFFSETS:
+ return StartupMode.GROUP_OFFSETS;
+ case SPECIFIC_OFFSETS:
+ return StartupMode.SPECIFIC_OFFSETS;
+ case TIMESTAMP:
+ return StartupMode.TIMESTAMP;
+
+ default:
+ throw new TableException(
+ "Unsupported startup mode. Validator should have
checked that.");
+ }
+ }
+
public static Properties getKafkaProperties(Map<String, String>
tableOptions) {
final Properties kafkaProperties = new Properties();