This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f93346c [FLINK-16125][connecotr/kafka] Remove Kafka connector
property zookeeper.connect and clear documentation because Kafka 0.8 connector
has been removed.
f93346c is described below
commit f93346c93c6a841cdce576d48a0b5ca8076cc195
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Mar 17 12:26:50 2020 +0800
[FLINK-16125][connecotr/kafka] Remove Kafka connector property
zookeeper.connect and clear documentation because Kafka 0.8 connector has been
removed.
---
docs/dev/table/connect.md | 9 ---------
docs/dev/table/connect.zh.md | 9 ---------
docs/dev/table/hive/hive_catalog.md | 2 --
docs/dev/table/hive/hive_catalog.zh.md | 2 --
docs/dev/table/sqlClient.md | 2 --
docs/dev/table/sqlClient.zh.md | 2 --
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 1 -
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 3 +--
.../apache/flink/table/descriptors/KafkaValidator.java | 8 ++------
.../streaming/connectors/kafka/KafkaConsumerTestBase.java | 1 -
.../kafka/KafkaTableSourceSinkFactoryTestBase.java | 15 ++++-----------
.../streaming/connectors/kafka/KafkaTableTestBase.java | 3 ---
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 1 -
.../schema/registry/test/TestAvroConsumerConfluent.java | 4 +---
.../flink/tests/util/kafka/StreamingKafkaITCase.java | 1 -
.../src/test/resources/kafka_json_source_schema.yaml | 3 ---
.../flink/streaming/kafka/test/base/KafkaExampleUtil.java | 4 ++--
.../apache/flink/streaming/kafka/test/KafkaExample.java | 2 +-
.../flink/streaming/kafka/test/Kafka010Example.java | 2 +-
.../flink/streaming/kafka/test/Kafka011Example.java | 2 +-
flink-end-to-end-tests/test-scripts/kafka_sql_common.sh | 1 -
.../test-scripts/test_confluent_schema_registry.sh | 2 +-
flink-python/pyflink/table/table_environment.py | 1 -
flink-python/pyflink/table/tests/test_descriptor.py | 4 +---
.../flink/table/api/java/StreamTableEnvironment.java | 1 -
.../java/org/apache/flink/table/api/TableEnvironment.java | 1 -
.../flink/table/api/scala/StreamTableEnvironment.scala | 1 -
27 files changed, 15 insertions(+), 72 deletions(-)
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 204c4cb..2bb8017 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- declare a format for this system
@@ -177,7 +176,6 @@ tableEnvironment
.version("0.10")
.topic("test-input")
.startFromEarliest()
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
@@ -211,7 +209,6 @@ table_environment \
.version("0.10")
.topic("test-input")
.start_from_earliest()
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
@@ -246,7 +243,6 @@ tables:
topic: test-input
startup-mode: earliest-offset
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
# declare a format for this system
@@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (
'connector.topic' = 'topic_name', -- required: topic name from which the
table is read
- -- required: specify the ZooKeeper connection string
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
-- required: specify the Kafka server connection string
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- required for Kafka source, optional for Kafka sink, specify consumer group
@@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
.topic("...") // required: topic name from which the table is read
// optional: connector specific properties
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")
@@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
.topic("...") # required: topic name from which the table is read
# optional: connector specific properties
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")
@@ -874,7 +866,6 @@ connector:
topic: ... # required: topic name from which the table is read
properties:
- zookeeper.connect: localhost:2181 # required: specify the ZooKeeper
connection string
bootstrap.servers: localhost:9092 # required: specify the Kafka server
connection string
group.id: testGroup # optional: required in Kafka consumer,
specify consumer group
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 66b8d9a..720ab54 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- declare a format for this system
@@ -177,7 +176,6 @@ tableEnvironment
.version("0.10")
.topic("test-input")
.startFromEarliest()
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
@@ -211,7 +209,6 @@ table_environment \
.version("0.10")
.topic("test-input")
.start_from_earliest()
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
@@ -246,7 +243,6 @@ tables:
topic: test-input
startup-mode: earliest-offset
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
# declare a format for this system
@@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (
'connector.topic' = 'topic_name', -- required: topic name from which the
table is read
- -- required: specify the ZooKeeper connection string
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
-- required: specify the Kafka server connection string
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- required for Kafka source, optional for Kafka sink, specify consumer group
@@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
.topic("...") // required: topic name from which the table is read
// optional: connector specific properties
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")
@@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
.topic("...") # required: topic name from which the table is read
# optional: connector specific properties
- .property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")
@@ -874,7 +866,6 @@ connector:
topic: ... # required: topic name from which the table is read
properties:
- zookeeper.connect: localhost:2181 # required: specify the ZooKeeper
connection string
bootstrap.servers: localhost:9092 # required: specify the Kafka server
connection string
group.id: testGroup # optional: required in Kafka consumer,
specify consumer group
diff --git a/docs/dev/table/hive/hive_catalog.md
b/docs/dev/table/hive/hive_catalog.md
index d2e7d5a..d907703 100644
--- a/docs/dev/table/hive/hive_catalog.md
+++ b/docs/dev/table/hive/hive_catalog.md
@@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH
(
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
@@ -227,7 +226,6 @@ Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
- flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
diff --git a/docs/dev/table/hive/hive_catalog.zh.md
b/docs/dev/table/hive/hive_catalog.zh.md
index d2e7d5a..d907703 100644
--- a/docs/dev/table/hive/hive_catalog.zh.md
+++ b/docs/dev/table/hive/hive_catalog.zh.md
@@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH
(
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
- 'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
@@ -227,7 +226,6 @@ Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
- flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index ed4ff80..9ea1bb8 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -317,7 +317,6 @@ tables:
topic: TaxiRides
startup-mode: earliest-offset
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
@@ -483,7 +482,6 @@ tables:
version: "0.11"
topic: OutputTopic
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 93d506b..642f448 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -317,7 +317,6 @@ tables:
topic: TaxiRides
startup-mode: earliest-offset
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
@@ -483,7 +482,6 @@ tables:
version: "0.11"
topic: OutputTopic
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
diff --git
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 64649ee..322c3aa 100644
---
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -253,7 +253,6 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
LOG.info("ZK and KafkaServer started.");
standardProps = new Properties();
- standardProps.setProperty("zookeeper.connect",
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers",
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index a3982ba..478ce38 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -134,7 +134,6 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
LOG.info("ZK and KafkaServer started.");
standardProps = new Properties();
- standardProps.setProperty("zookeeper.connect",
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers",
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
@@ -393,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
kafkaProperties.put("advertised.host.name", KAFKA_HOST);
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect",
zookeeperConnectionString);
kafkaProperties.put("message.max.bytes", String.valueOf(50 *
1024 * 1024));
+ kafkaProperties.put("zookeeper.connect",
zookeeperConnectionString);
kafkaProperties.put("replica.fetch.max.bytes",
String.valueOf(50 * 1024 * 1024));
kafkaProperties.put("transaction.max.timeout.ms",
Integer.toString(1000 * 60 * 60 * 2)); // 2hours
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index f55bc1e..158417a 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -54,7 +54,6 @@ public class KafkaValidator extends
ConnectorDescriptorValidator {
public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
public static final String CONNECTOR_STARTUP_TIMESTAMP_MILLIS =
"connector.startup-timestamp-millis";
public static final String CONNECTOR_PROPERTIES =
"connector.properties";
- public static final String CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT =
"connector.properties.zookeeper.connect";
public static final String CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER =
"connector.properties.bootstrap.servers";
public static final String CONNECTOR_PROPERTIES_GROUP_ID =
"connector.properties.group.id";
public static final String CONNECTOR_PROPERTIES_KEY = "key";
@@ -136,11 +135,9 @@ public class KafkaValidator extends
ConnectorDescriptorValidator {
}
private void validateKafkaProperties(DescriptorProperties properties) {
- if
(properties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT)
- ||
properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
+ if
(properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
||
properties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) {
-
properties.validateString(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT, false);
properties.validateString(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER, false);
properties.validateString(CONNECTOR_PROPERTIES_GROUP_ID, true);
@@ -235,8 +232,7 @@ public class KafkaValidator extends
ConnectorDescriptorValidator {
}
public static boolean hasConciseKafkaProperties(DescriptorProperties
descriptorProperties) {
- return
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) ||
-
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
+ return
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID);
}
}
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2cf999b..b788fb8 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -169,7 +169,6 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
// use wrong ports for the consumers
properties.setProperty("bootstrap.servers",
"localhost:80");
- properties.setProperty("zookeeper.connect",
"localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000");
// let the test fail fast
properties.setProperty("socket.timeout.ms", "3000");
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 0875c28..d8eb011 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -93,7 +93,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase
extends TestLogger {
private static final Properties KAFKA_PROPERTIES = new Properties();
static {
- KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
KAFKA_PROPERTIES.setProperty("group.id", "dummy");
KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
}
@@ -224,7 +223,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase
extends TestLogger {
// use legacy properties
legacyPropertiesMap.remove("connector.specific-offsets");
-
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
legacyPropertiesMap.remove("connector.properties.group.id");
@@ -236,12 +234,10 @@ public abstract class KafkaTableSourceSinkFactoryTestBase
extends TestLogger {
legacyPropertiesMap.put("connector.specific-offsets.0.offset",
"100");
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
legacyPropertiesMap.put("connector.specific-offsets.1.offset",
"123");
- legacyPropertiesMap.put("connector.properties.0.key",
"zookeeper.connect");
+ legacyPropertiesMap.put("connector.properties.0.key",
"bootstrap.servers");
legacyPropertiesMap.put("connector.properties.0.value",
"dummy");
- legacyPropertiesMap.put("connector.properties.1.key",
"bootstrap.servers");
+ legacyPropertiesMap.put("connector.properties.1.key",
"group.id");
legacyPropertiesMap.put("connector.properties.1.value",
"dummy");
- legacyPropertiesMap.put("connector.properties.2.key",
"group.id");
- legacyPropertiesMap.put("connector.properties.2.value",
"dummy");
final TableSource<?> actualSource =
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
.createStreamTableSource(legacyPropertiesMap);
@@ -330,7 +326,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase
extends TestLogger {
// use legacy properties
legacyPropertiesMap.remove("connector.specific-offsets");
-
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
legacyPropertiesMap.remove("connector.properties.group.id");
@@ -342,12 +337,10 @@ public abstract class KafkaTableSourceSinkFactoryTestBase
extends TestLogger {
legacyPropertiesMap.put("connector.specific-offsets.0.offset",
"100");
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
legacyPropertiesMap.put("connector.specific-offsets.1.offset",
"123");
- legacyPropertiesMap.put("connector.properties.0.key",
"zookeeper.connect");
+ legacyPropertiesMap.put("connector.properties.0.key",
"bootstrap.servers");
legacyPropertiesMap.put("connector.properties.0.value",
"dummy");
- legacyPropertiesMap.put("connector.properties.1.key",
"bootstrap.servers");
+ legacyPropertiesMap.put("connector.properties.1.key",
"group.id");
legacyPropertiesMap.put("connector.properties.1.value",
"dummy");
- legacyPropertiesMap.put("connector.properties.2.key",
"group.id");
- legacyPropertiesMap.put("connector.properties.2.value",
"dummy");
final TableSink<?> actualSink =
TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
index bd526e9..a3d0acd 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
@@ -66,7 +66,6 @@ public abstract class KafkaTableTestBase extends
KafkaTestBase {
// ---------- Produce an event time stream into Kafka
-------------------
String groupId = standardProps.getProperty("group.id");
- String zk = standardProps.getProperty("zookeeper.connect");
String bootstraps =
standardProps.getProperty("bootstrap.servers");
// TODO: use DDL to register Kafka once FLINK-15282 is fixed.
@@ -83,7 +82,6 @@ public abstract class KafkaTableTestBase extends
KafkaTestBase {
properties.put("connector.type", "kafka");
properties.put("connector.topic", topic);
properties.put("connector.version", kafkaVersion());
- properties.put("connector.properties.zookeeper.connect", zk);
properties.put("connector.properties.bootstrap.servers",
bootstraps);
properties.put("connector.properties.group.id", groupId);
properties.put("connector.startup-mode", "earliest-offset");
@@ -112,7 +110,6 @@ public abstract class KafkaTableTestBase extends
KafkaTestBase {
// " 'connector.type' = 'kafka',\n" +
// " 'connector.topic' = '" + topic + "',\n" +
// " 'connector.version' = 'universal',\n" +
-// " 'connector.properties.zookeeper.connect' = '" + zk +
"',\n" +
// " 'connector.properties.bootstrap.servers' = '" +
bootstraps + "',\n" +
// " 'connector.properties.group.id' = '" + groupId + "',
\n" +
// " 'connector.startup-mode' = 'earliest-offset', \n" +
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 15b1594..16cb724 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -137,7 +137,6 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
LOG.info("ZK and KafkaServer started.");
standardProps = new Properties();
- standardProps.setProperty("zookeeper.connect",
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers",
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
diff --git
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index 55549de..dda4617 100644
---
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -37,7 +37,7 @@ import java.util.Properties;
* A simple example that shows how to read from and write to Kafka with
Confluent Schema Registry.
* This will read AVRO messages from the input topic, parse them into a POJO
type via checking the Schema by calling Schema registry.
* Then this example publish the POJO type to kafka by converting the POJO to
AVRO and verifying the schema.
- * --input-topic test-input --output-string-topic test-output
--output-avro-topic test-avro-output --output-subject --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url
http://localhost:8081 --group.id myconsumer
+ * --input-topic test-input --output-string-topic test-output
--output-avro-topic test-avro-output --output-subject --bootstrap.servers
localhost:9092 --schema-registry-url http://localhost:8081 --group.id myconsumer
*/
public class TestAvroConsumerConfluent {
@@ -49,14 +49,12 @@ public class TestAvroConsumerConfluent {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic>
--output-string-topic <topic> --output-avro-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
- "--zookeeper.connect <zk quorum> " +
"--schema-registry-url <confluent schema
registry> --group.id <some id>");
return;
}
Properties config = new Properties();
config.setProperty("bootstrap.servers",
parameterTool.getRequired("bootstrap.servers"));
config.setProperty("group.id",
parameterTool.getRequired("group.id"));
- config.setProperty("zookeeper.connect",
parameterTool.getRequired("zookeeper.connect"));
String schemaRegistryUrl =
parameterTool.getRequired("schema-registry-url");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index a7ffb14..22d8506 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -101,7 +101,6 @@ public class StreamingKafkaITCase extends TestLogger {
.addArgument("--output-topic", outputTopic)
.addArgument("--prefix", "PREFIX")
.addArgument("--bootstrap.servers",
kafka.getBootstrapServerAddresses().stream().map(address ->
address.getHostString() + ':' +
address.getPort()).collect(Collectors.joining(",")))
- .addArgument("--zookeeper.connect ",
kafka.getZookeeperAddress().getHostString() + ':' +
kafka.getZookeeperAddress().getPort())
.addArgument("--group.id", "myconsumer")
.addArgument("--auto.offset.reset", "earliest")
.addArgument("--transaction.timeout.ms",
"900000")
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
index 6de0c71..600e3f1 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
@@ -40,7 +40,6 @@ tables:
topic: $TOPIC_NAME
startup-mode: earliest-offset
properties:
- zookeeper.connect: $KAFKA_ZOOKEEPER_ADDRESS
bootstrap.servers: $KAFKA_BOOTSTRAP_SERVERS
format:
type: json
@@ -86,8 +85,6 @@ tables:
topic: test-avro
startup-mode: earliest-offset
properties:
- - key: zookeeper.connect
- value: $KAFKA_ZOOKEEPER_ADDRESS
- key: bootstrap.servers
value: $KAFKA_BOOTSTRAP_SERVERS
format:
diff --git
a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
index 0e3c4ea..5505a7a 100644
---
a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
+++
b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
@@ -34,11 +34,11 @@ public class KafkaExampleUtil {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
- "--zookeeper.connect <zk quorum> --group.id
<some id>");
+ "--group.id <some id>");
throw new Exception("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
- "--zookeeper.connect <zk quorum> --group.id
<some id>");
+ "--group.id <some id>");
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 3a3be93..f3c844c 100644
---
a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++
b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -40,7 +40,7 @@ import
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
*
* <p>Example usage:
* --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092
- * --zookeeper.connect localhost:2181 --group.id myconsumer
+ * --group.id myconsumer
*/
public class KafkaExample extends KafkaExampleUtil {
diff --git
a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
index 0b97179..14c9493 100644
---
a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
+++
b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
@@ -38,7 +38,7 @@ import
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
* the String messages are of formatted as a (word,frequency,timestamp) tuple.
*
* <p>Example usage:
- * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 localhost:2181 --group.id myconsumer
*/
public class Kafka010Example {
diff --git
a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
index 1f877c5..fafd307 100644
---
a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
+++
b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
@@ -38,7 +38,7 @@ import
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
* the String messages are of formatted as a (word,frequency,timestamp) tuple.
*
* <p>Example usage:
- * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ * --input-topic test-input --output-topic test-output --bootstrap.servers
localhost:9092 --group.id myconsumer
*/
public class Kafka011Example {
diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 560e43e..c7ed12a 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -68,7 +68,6 @@ function get_kafka_json_source_schema {
topic: $topicName
startup-mode: earliest-offset
properties:
- zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
format:
type: json
diff --git
a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
index 5d3e9e4..a023f39 100755
--- a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
+++ b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
@@ -78,7 +78,7 @@ create_kafka_topic 1 1 test-avro-out
# Read Avro message from [test-avro-input], check the schema and send message
to [test-string-ou]
$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
--input-topic test-avro-input --output-string-topic test-string-out
--output-avro-topic test-avro-out --output-subject test-output-subject \
- --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181
--group.id myconsumer --auto.offset.reset earliest \
+ --bootstrap.servers localhost:9092 --group.id myconsumer --auto.offset.reset
earliest \
--schema-registry-url ${SCHEMA_REGISTRY_URL}
#echo "Reading messages from Kafka topic [test-string-ou] ..."
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 46aa70f..7acdaa0 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -522,7 +522,6 @@ class TableEnvironment(object):
... 'connector.type' = 'kafka',
... 'update-mode' = 'append',
... 'connector.topic' = 'xxx',
- ... 'connector.properties.zookeeper.connect' =
'localhost:2181',
... 'connector.properties.bootstrap.servers' = 'localhost:9092'
... )
... '''
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py
b/flink-python/pyflink/table/tests/test_descriptor.py
index 002f499..6894e10 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -61,12 +61,10 @@ class KafkaDescriptorTests(PyFlinkTestCase):
self.assertEqual(expected, properties)
def test_properties(self):
- kafka = Kafka().properties({"zookeeper.connect": "localhost:2181",
- "bootstrap.servers": "localhost:9092"})
+ kafka = Kafka().properties({"bootstrap.servers": "localhost:9092"})
properties = kafka.to_properties()
expected = {'connector.type': 'kafka',
- 'connector.properties.zookeeper.connect': 'localhost:2181',
'connector.properties.bootstrap.servers': 'localhost:9092',
'connector.property-version': '1'}
self.assertEqual(expected, properties)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index 364e619..74f0b65 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -701,7 +701,6 @@ public interface StreamTableEnvironment extends
TableEnvironment {
* new Kafka()
* .version("0.11")
* .topic("clicks")
- * .property("zookeeper.connect", "localhost")
* .property("group.id", "click-group")
* .startFromEarliest())
* .withFormat(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index f086dd8..c89d76a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -689,7 +689,6 @@ public interface TableEnvironment {
* 'connector.type' = 'kafka',
* 'update-mode' = 'append',
* 'connector.topic' = 'xxx',
- * 'connector.properties.zookeeper.connect' =
'localhost:2181',
* 'connector.properties.bootstrap.servers' =
'localhost:9092',
* ...
* )";
diff --git
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 54d0c97..d569c83 100644
---
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -458,7 +458,6 @@ trait StreamTableEnvironment extends TableEnvironment {
* new Kafka()
* .version("0.11")
* .topic("clicks")
- * .property("zookeeper.connect", "localhost")
* .property("group.id", "click-group")
* .startFromEarliest())
* .withFormat(