[
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13912363#comment-13912363
]
Jun Rao commented on KAFKA-1227:
--------------------------------
Accumulated some more review comments.
20. Regarding VIP. We need to think through another corner case. When the
brokers for all replicas of an existing topic die, the admin can start new
brokers with existing broker ids to start the topic from scratch. Those new
brokers can be added to the VIP. However, if the producer only uses the VIP
once during startup, there is no way for the producer to identify the new
brokers unless it's restarted.
21. AbstractConfig:
21.1 To be consistent, we probably should change the return value of the
following api from Long to long.
public Long getLong(String key)
21.1 Could we also add getDouble(String key)?
22. Metadata.fetch(): I am a bit puzzled by the following code. Not sure what
the intention for the handling of InterruptedException is. This code will be
called in the producer client thread, which will probably be interrupted during
shutdown. When that happens, we probably should just throw the
InterruptedException back to the caller. Otherwise, the producer may have to
wait metadataFetchTimeoutMs before it can shut down. Also, it seems that we
need to adjust maxWaitMs in each loop. Otherwise, we will be waiting longer
than we should.
try {
wait(maxWaitMs);
} catch (InterruptedException e) { /* this is fine, just try
again */
}
23. Sender: We learned in kafka-1228 that it's not enough to just handle
IOException in the following code. UnresolvedAddressException is an Error, not
an IOException.
private void initiateConnect(Node node, long now) {
try {
selector.connect(node.id(), new InetSocketAddress(node.host(),
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
this.nodeStates.connecting(node.id(), now);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
nodeStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */
metadata.forceUpdate();
}
24. BufferPool.allocate(): Could we add some comments on when this call is
blocked?
25. MemoryRecords: The following constructor doesn't seem to be used.
public MemoryRecords(int size)
26. Spellings:
26.1 ConfigDef: seperated list, programmatically
26.2 Metadata: ellapsed
> Code dump of new producer
> -------------------------
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
> Issue Type: New Feature
> Reporter: Jay Kreps
> Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch, KAFKA-1227.patch, KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series
> of post-commit reviews to get it into shape. This bug tracks just the code
> dump.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)