asfgit closed pull request #7087: [FLINK-10843] [connectors] Make Kafka table 
factory versioning more flexible
URL: https://github.com/apache/flink/pull/7087
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 5f1112706ce..effd913707e 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -40,14 +40,15 @@ The following table list all available connectors and 
formats. Their mutual comp
 
 ### Connectors
 
-| Name              | Version       | Maven dependency             | SQL 
Client JAR         |
-| :---------------- | :------------ | :--------------------------- | 
:----------------------|
-| Filesystem        |               | Built-in                     | Built-in  
             |
-| Elasticsearch     | 6             | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.8           | `flink-connector-kafka-0.8`  | Not 
available          |
-| Apache Kafka      | 0.9           | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.10          | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.11          | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Name              | Version             | Maven dependency             | SQL 
Client JAR         |
+| :---------------- | :------------------ | :--------------------------- | 
:----------------------|
+| Filesystem        |                     | Built-in                     | 
Built-in               |
+| Elasticsearch     | 6                   | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.8                 | `flink-connector-kafka-0.8`  | Not 
available          |
+| Apache Kafka      | 0.9                 | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.10                | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.11                | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.11+ (`universal`) | `flink-connector-kafka`      | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 
 ### Formats
 
@@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
 {% highlight java %}
 .connect(
   new Kafka()
-    .version("0.11")    // required: valid connector versions are "0.8", 
"0.9", "0.10", and "0.11"
+    .version("0.11")    // required: valid connector versions are
+                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
     .topic("...")       // required: topic name from which the table is read
 
     // optional: connector specific properties
@@ -549,7 +551,8 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
 {% highlight yaml %}
 connector:
   type: kafka
-  version: "0.11"     # required: valid connector versions are "0.8", "0.9", 
"0.10", and "0.11"
+  version: "0.11"     # required: valid connector versions are
+                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
   topic: ...          # required: topic name from which the table is read
 
   properties:         # optional: connector specific properties
@@ -583,7 +586,9 @@ connector:
 
 **Consistency guarantees:** By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
[checkpointing enabled]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing).
 
-**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp 
as metadata that specifies when the record was written into the Kafka topic. 
These timestamps can be used for a [rowtime 
attribute](connect.html#defining-the-schema) by selecting `timestamps: 
from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. 
+**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp 
as metadata that specifies when the record was written into the Kafka topic. 
These timestamps can be used for a [rowtime 
attribute](connect.html#defining-the-schema) by selecting `timestamps: 
from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively.
+
+**Kafka 0.11+ Versioning:** Since Flink 1.7, the Kafka connector definition 
should be independent of a hard-coded Kafka version. Use the connector version 
`universal` as a wildcard for Flink's Kafka connector that is compatible with 
all Kafka versions starting from 0.11.
 
 Make sure to add the version-specific Kafka dependency. In addition, a 
corresponding format needs to be specified for reading and writing rows from 
and to Kafka.
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 86d7ef6d35b..0dbbbeb348a 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -21,9 +21,7 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -40,7 +38,7 @@
        public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
        public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
        public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
-       public static final String CONNECTOR_VERSION_VALUE_20 = "2.0";
+       public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = 
"universal";
        public static final String CONNECTOR_TOPIC = "connector.topic";
        public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
        public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
@@ -64,7 +62,7 @@ public void validate(DescriptorProperties properties) {
                super.validate(properties);
                properties.validateValue(CONNECTOR_TYPE, 
CONNECTOR_TYPE_VALUE_KAFKA, false);
 
-               validateVersion(properties);
+               properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
 
                validateStartupMode(properties);
 
@@ -73,17 +71,6 @@ public void validate(DescriptorProperties properties) {
                validateSinkPartitioner(properties);
        }
 
-       private void validateVersion(DescriptorProperties properties) {
-               final List<String> versions = Arrays.asList(
-                       CONNECTOR_VERSION_VALUE_08,
-                       CONNECTOR_VERSION_VALUE_09,
-                       CONNECTOR_VERSION_VALUE_010,
-                       CONNECTOR_VERSION_VALUE_011,
-                       CONNECTOR_VERSION_VALUE_20);
-               properties.validateEnumValues(CONNECTOR_VERSION, false, 
versions);
-               properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
-       }
-
        private void validateStartupMode(DescriptorProperties properties) {
                final Map<String, Consumer<String>> specificOffsetValidators = 
new HashMap<>();
                specificOffsetValidators.put(
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
index b0dfc54e671..2b498674a89 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
@@ -39,7 +39,7 @@
 
        @Override
        protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
index 5043880b21e..4d843bcd82e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
@@ -40,7 +40,7 @@
 
        @Override
        protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
        }
 
        @Override
diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh 
b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 3e95b405b10..468f0587db7 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -21,6 +21,7 @@ KAFKA_CONNECTOR_VERSION="$1"
 KAFKA_VERSION="$2"
 CONFLUENT_VERSION="$3"
 CONFLUENT_MAJOR_VERSION="$4"
+KAFKA_SQL_VERSION="$5"
 
 source "$(dirname "$0")"/kafka-common.sh $2 $3 $4
 
@@ -64,7 +65,7 @@ function get_kafka_json_source_schema {
         type: ROW<type VARCHAR, message VARCHAR>
     connector:
       type: kafka
-      version: "$KAFKA_CONNECTOR_VERSION"
+      version: "$KAFKA_SQL_VERSION"
       topic: $topicName
       startup-mode: earliest-offset
       properties:
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 74192b40dfb..5dd68838ba7 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -23,9 +23,15 @@ KAFKA_CONNECTOR_VERSION="2.0"
 KAFKA_VERSION="2.0.0"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
+KAFKA_SQL_VERSION="universal"
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION 
$KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
+source "$(dirname "$0")"/kafka_sql_common.sh \
+  $KAFKA_CONNECTOR_VERSION \
+  $KAFKA_VERSION \
+  $CONFLUENT_VERSION \
+  $CONFLUENT_MAJOR_VERSION \
+  $KAFKA_SQL_VERSION
 source "$(dirname "$0")"/elasticsearch-common.sh
 
 SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
index a05dc050a60..94e89a2b1e8 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka" "universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
index c710abc72bc..66bef6623a1 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 
3.2 "kafka-0.10"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 
3.2 "kafka-0.10" "0.10"
diff --git 
a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
index 149c86f570e..08ed59b419c 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
@@ -22,9 +22,15 @@ KAFKA_VERSION="$2"
 CONFLUENT_VERSION="$3"
 CONFLUENT_MAJOR_VERSION="$4"
 KAFKA_SQL_JAR="$5"
+KAFKA_SQL_VERSION="$6"
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION 
$KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
+source "$(dirname "$0")"/kafka_sql_common.sh \
+  $KAFKA_CONNECTOR_VERSION \
+  $KAFKA_VERSION \
+  $CONFLUENT_VERSION \
+  $CONFLUENT_MAJOR_VERSION \
+  $KAFKA_SQL_VERSION
 
 
################################################################################
 # Prepare connectors
@@ -98,7 +104,7 @@ cat >> $SQL_CONF << EOF
         type: BIGINT
     connector:
       type: kafka
-      version: "$KAFKA_CONNECTOR_VERSION"
+      version: "$KAFKA_SQL_VERSION"
       topic: test-avro
       startup-mode: earliest-offset
       properties:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to