This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 020f329 [FLINK-8290] Allow setting clientId in
flink-connector-kafka-0.8
020f329 is described below
commit 020f3299ebccc36297a0d3db425cc5f44c70a8b7
Author: maqingxiang-it <[email protected]>
AuthorDate: Tue Jan 16 21:42:45 2018 +0800
[FLINK-8290] Allow setting clientId in flink-connector-kafka-0.8
---
.../streaming/connectors/kafka/internals/SimpleConsumerThread.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 4c704c3..1fdff9d 100644
---
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -94,6 +94,7 @@ class SimpleConsumerThread<T> extends Thread {
private final int fetchSize;
private final int bufferSize;
private final int reconnectLimit;
+ private final String clientId;
// exceptions are thrown locally
public SimpleConsumerThread(
@@ -123,6 +124,8 @@ class SimpleConsumerThread<T> extends Thread {
this.fetchSize = getInt(config, "fetch.message.max.bytes",
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes",
65536);
this.reconnectLimit = getInt(config,
"flink.simple-consumer-reconnectLimit", 3);
+ String groupId = config.getProperty("group.id",
"flink-kafka-consumer-legacy-" + broker.id());
+ this.clientId = config.getProperty("client.id", groupId);
}
public
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>>
getNewPartitionsQueue() {
@@ -138,8 +141,6 @@ class SimpleConsumerThread<T> extends Thread {
LOG.info("Starting to fetch from {}", this.partitions);
// set up the config values
- final String clientId = "flink-kafka-consumer-legacy-" +
broker.id();
-
try {
// create the Kafka consumer that we actually use for
fetching
consumer = new SimpleConsumer(broker.host(),
broker.port(), soTimeout, bufferSize, clientId);