[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266560#comment-16266560
 ] 

ASF GitHub Bot commented on FLINK-4822:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5050#discussion_r153141966
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
    @@ -148,4 +171,17 @@ public static Long 
getOffsetFromZooKeeper(CuratorFramework curatorClient, String
                        }
                }
        }
    +
    +   public static void registerPartitionOwnership(CuratorFramework 
curatorClient, String groupId, String consumerId, String topic, int partition, 
String taskId) throws Exception {
    +           String path = "/consumers/" + groupId + "/owners/" + topic + 
"/" + Integer.toString(partition);
    +           // register with task info that we can read from zookeeper 
which taskmanager consume the right topic
    +           String info = consumerId + "_" + taskId;
    +           try {
    +                   if (curatorClient.checkExists().forPath(path) == null) {
    +                           
curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
 info.getBytes());
    +                   }
    +           } catch (KeeperException.NodeExistsException e) {
    +                   LOG.warn("NodeExists for {}", e);
    --- End diff --
    
    Also, shouldn't the log arguments be be `("Node exists for {}", consumerId, 
e)`?


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-4822
>                 URL: https://issues.apache.org/jira/browse/FLINK-4822
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to