This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new abbd53d KAFKA-6299; Fix AdminClient error handling when metadata
changes (#4295)
abbd53d is described below
commit abbd53da4ad98c3a95118c9770a9d247e54b0eef
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed May 9 14:27:28 2018 -0700
KAFKA-6299; Fix AdminClient error handling when metadata changes (#4295)
When AdminClient gets a NOT_CONTROLLER error, it should refresh its
metadata and retry the request, rather than making the end-user deal with
NotControllerException.
Move AdminClient's metadata management outside of NetworkClient and into
AdminMetadataManager. This will make it easier to do more sophisticated
metadata management in the future, such as implementing a NodeProvider which
fetches the leaders for topics.
Rather than manipulating newCalls directly, the AdminClient service thread
now drains it directly into pendingCalls. This minimizes the amount of locking
we have to do, since pendingCalls is only accessed from the service thread.
---
.../org/apache/kafka/clients/MetadataUpdater.java | 2 +-
.../kafka/clients/admin/KafkaAdminClient.java | 372 +++++++++++++--------
.../admin/internal/AdminMetadataManager.java | 247 ++++++++++++++
.../main/java/org/apache/kafka/common/Cluster.java | 3 +-
.../java/org/apache/kafka/clients/MockClient.java | 26 ++
.../clients/admin/AdminClientUnitTestEnv.java | 23 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 123 +++++--
.../apache/kafka/connect/util/TopicAdminTest.java | 3 +-
8 files changed, 626 insertions(+), 173 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 1267283..09ed995 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -29,7 +29,7 @@ import java.util.List;
* <p>
* This class is not thread-safe!
*/
-interface MetadataUpdater {
+public interface MetadataUpdater {
/**
* Gets the current cluster info without blocking.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d8c0bad..70e9fbd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,11 +22,11 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -44,7 +44,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -186,9 +185,9 @@ public class KafkaAdminClient extends AdminClient {
private final Time time;
/**
- * The cluster metadata used by the KafkaClient.
+ * The cluster metadata manager used by the KafkaClient.
*/
- private final Metadata metadata;
+ private final AdminMetadataManager metadataManager;
/**
* The metrics for this KafkaAdminClient.
@@ -327,8 +326,9 @@ public class KafkaAdminClient extends AdminClient {
try {
// Since we only request node information, it's safe to pass true
for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
- Metadata metadata = new
Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
- config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
true);
+ AdminMetadataManager metadataManager = new
AdminMetadataManager(logContext, time,
+ config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<MetricsReporter> reporters =
config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
Map<String, String> metricTags =
Collections.singletonMap("client-id", clientId);
@@ -344,7 +344,7 @@ public class KafkaAdminClient extends AdminClient {
metrics, time, metricGrpPrefix, channelBuilder,
logContext);
networkClient = new NetworkClient(
selector,
- metadata,
+ metadataManager.updater(),
clientId,
1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
@@ -356,7 +356,7 @@ public class KafkaAdminClient extends AdminClient {
true,
apiVersions,
logContext);
- return new KafkaAdminClient(config, clientId, time, metadata,
metrics, networkClient,
+ return new KafkaAdminClient(config, clientId, time,
metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
@@ -367,35 +367,39 @@ public class KafkaAdminClient extends AdminClient {
}
}
- static KafkaAdminClient createInternal(AdminClientConfig config,
KafkaClient client, Metadata metadata, Time time) {
+ static KafkaAdminClient createInternal(AdminClientConfig config,
KafkaClient client, Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
try {
metrics = new Metrics(new MetricConfig(), new
LinkedList<MetricsReporter>(), time);
- return new KafkaAdminClient(config, clientId, time, metadata,
metrics, client, null,
- createLogContext(clientId));
+ LogContext logContext = createLogContext(clientId);
+ AdminMetadataManager metadataManager = new
AdminMetadataManager(logContext, time,
+ config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ return new KafkaAdminClient(config, clientId, time,
metadataManager, metrics,
+ client, null, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
throw new KafkaException("Failed create new KafkaAdminClient",
exc);
}
}
- private static LogContext createLogContext(String clientId) {
+ static LogContext createLogContext(String clientId) {
return new LogContext("[AdminClient clientId=" + clientId + "] ");
}
- private KafkaAdminClient(AdminClientConfig config, String clientId, Time
time, Metadata metadata,
- Metrics metrics, KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
- LogContext logContext) {
+ private KafkaAdminClient(AdminClientConfig config, String clientId, Time
time,
+ AdminMetadataManager metadataManager, Metrics metrics,
KafkaClient client,
+ TimeoutProcessorFactory timeoutProcessorFactory, LogContext
logContext) {
this.defaultTimeoutMs =
config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.time = time;
- this.metadata = metadata;
+ this.metadataManager = metadataManager;
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses),
Collections.<String>emptySet(), time.milliseconds());
+ metadataManager.update(Cluster.bootstrap(addresses),
time.milliseconds(), null);
this.metrics = metrics;
this.client = client;
this.runnable = new AdminClientRunnable();
@@ -462,6 +466,13 @@ public class KafkaAdminClient extends AdminClient {
Node provide();
}
+ private class MetadataUpdateNodeIdProvider implements NodeProvider {
+ @Override
+ public Node provide() {
+ return client.leastLoadedNode(time.milliseconds());
+ }
+ }
+
private class ConstantNodeIdProvider implements NodeProvider {
private final int nodeId;
@@ -471,7 +482,16 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Node provide() {
- return metadata.fetch().nodeById(nodeId);
+ if (metadataManager.isReady() &&
+ (metadataManager.nodeById(nodeId) != null)) {
+ return metadataManager.nodeById(nodeId);
+ }
+ // If we can't find the node with the given constant ID, we
schedule a
+ // metadata update and hope it appears. This behavior is useful
for avoiding
+ // flaky behavior in tests when the cluster is starting up and not
all nodes
+ // have appeared.
+ metadataManager.requestUpdate();
+ return null;
}
}
@@ -481,7 +501,12 @@ public class KafkaAdminClient extends AdminClient {
private class ControllerNodeProvider implements NodeProvider {
@Override
public Node provide() {
- return metadata.fetch().controller();
+ if (metadataManager.isReady() &&
+ (metadataManager.controller() != null)) {
+ return metadataManager.controller();
+ }
+ metadataManager.requestUpdate();
+ return null;
}
}
@@ -491,23 +516,40 @@ public class KafkaAdminClient extends AdminClient {
private class LeastLoadedNodeProvider implements NodeProvider {
@Override
public Node provide() {
- return client.leastLoadedNode(time.milliseconds());
+ if (metadataManager.isReady()) {
+ // This may return null if all nodes are busy.
+ // In that case, we will postpone node assignment.
+ return client.leastLoadedNode(time.milliseconds());
+ }
+ metadataManager.requestUpdate();
+ return null;
}
}
abstract class Call {
+ private final boolean internal;
private final String callName;
private final long deadlineMs;
private final NodeProvider nodeProvider;
private int tries = 0;
private boolean aborted = false;
+ private Node curNode = null;
- Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+ Call(boolean internal, String callName, long deadlineMs, NodeProvider
nodeProvider) {
+ this.internal = internal;
this.callName = callName;
this.deadlineMs = deadlineMs;
this.nodeProvider = nodeProvider;
}
+ Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+ this(false, callName, deadlineMs, nodeProvider);
+ }
+
+ protected Node curNode() {
+ return curNode;
+ }
+
/**
* Handle a failure.
*
@@ -615,6 +657,10 @@ public class KafkaAdminClient extends AdminClient {
public String toString() {
return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs
+ ")";
}
+
+ public boolean isInternal() {
+ return internal;
+ }
}
static class TimeoutProcessorFactory {
@@ -698,43 +744,15 @@ public class KafkaAdminClient extends AdminClient {
private List<Call> newCalls = new LinkedList<>();
/**
- * Check if the AdminClient metadata is ready.
- * We need to know who the controller is, and have a non-empty view of
the cluster.
- *
- * @param prevMetadataVersion The previous metadata version
which wasn't usable.
- * @return null if the metadata is usable;
the current metadata
- * version otherwise
- */
- private Integer checkMetadataReady(Integer prevMetadataVersion) {
- if (prevMetadataVersion != null) {
- if (prevMetadataVersion == metadata.version())
- return prevMetadataVersion;
- }
- Cluster cluster = metadata.fetch();
- if (cluster.nodes().isEmpty()) {
- log.trace("Metadata is not ready yet. No cluster nodes
found.");
- return metadata.requestUpdate();
- }
- if (cluster.controller() == null) {
- log.trace("Metadata is not ready yet. No controller found.");
- return metadata.requestUpdate();
- }
- if (prevMetadataVersion != null) {
- log.trace("Metadata is now ready.");
- }
- return null;
- }
-
- /**
- * Time out the elements in the newCalls list which are expired.
+ * Time out the elements in the pendingCalls list which are expired.
*
* @param processor The timeout processor.
*/
- private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
- int numTimedOut = processor.handleTimeouts(newCalls,
+ private void timeoutPendingCalls(TimeoutProcessor processor,
List<Call> pendingCalls) {
+ int numTimedOut = processor.handleTimeouts(pendingCalls,
"Timed out waiting for a node assignment.");
if (numTimedOut > 0)
- log.debug("Timed out {} new calls.", numTimedOut);
+ log.debug("Timed out {} pending calls.", numTimedOut);
}
/**
@@ -743,7 +761,7 @@ public class KafkaAdminClient extends AdminClient {
* @param processor The timeout processor.
* @param callsToSend A map of nodes to the calls they need to
handle.
*/
- private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node,
List<Call>> callsToSend) {
+ private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node,
List<Call>> callsToSend) {
int numTimedOut = 0;
for (List<Call> callList : callsToSend.values()) {
numTimedOut += processor.handleTimeouts(callList,
@@ -751,48 +769,52 @@ public class KafkaAdminClient extends AdminClient {
}
if (numTimedOut > 0)
log.debug("Timed out {} call(s) with assigned nodes.",
numTimedOut);
+ return numTimedOut;
}
/**
- * Choose nodes for the calls in the callsToSend list.
+ * Drain all the calls from newCalls into pendingCalls.
*
* This function holds the lock for the minimum amount of time, to
avoid blocking
* users of AdminClient who will also take the lock to add new calls.
- *
- * @param now The current time in milliseconds.
- * @param callsToSend A map of nodes to the calls they need to
handle.
- *
*/
- private void chooseNodesForNewCalls(long now, Map<Node, List<Call>>
callsToSend) {
- List<Call> newCallsToAdd = null;
- synchronized (this) {
- if (newCalls.isEmpty()) {
- return;
- }
- newCallsToAdd = newCalls;
- newCalls = new LinkedList<>();
- }
- for (Call call : newCallsToAdd) {
- chooseNodeForNewCall(now, callsToSend, call);
+ private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+ if (!newCalls.isEmpty()) {
+ pendingCalls.addAll(newCalls);
+ newCalls.clear();
}
}
/**
- * Choose a node for a new call.
+ * Choose nodes for the calls in the pendingCalls list.
*
* @param now The current time in milliseconds.
+ * @param pendingIter An iterator yielding pending calls.
* @param callsToSend A map of nodes to the calls they need to
handle.
- * @param call The call.
+ *
*/
- private void chooseNodeForNewCall(long now, Map<Node, List<Call>>
callsToSend, Call call) {
- Node node = call.nodeProvider.provide();
- if (node == null) {
- call.fail(now, new BrokerNotAvailableException(
- String.format("Error choosing node for %s: no node
found.", call.callName)));
- return;
+ private void chooseNodesForPendingCalls(long now, Iterator<Call>
pendingIter,
+ Map<Node, List<Call>> callsToSend) {
+ while (pendingIter.hasNext()) {
+ Call call = pendingIter.next();
+ Node node = null;
+ try {
+ node = call.nodeProvider.provide();
+ } catch (Throwable t) {
+ // Handle authentication errors while choosing nodes.
+ log.debug("Unable to choose node for {}", call, t);
+ pendingIter.remove();
+ call.fail(now, t);
+ }
+ if (node != null) {
+ log.trace("Assigned {} to node {}", call, node);
+ pendingIter.remove();
+ call.curNode = node;
+ getOrCreateListValue(callsToSend, node).add(call);
+ } else {
+ log.trace("Unable to assign {} to a node.", call);
+ }
}
- log.trace("Assigned {} to {}", call, node);
- getOrCreateListValue(callsToSend, node).add(call);
}
/**
@@ -881,37 +903,6 @@ public class KafkaAdminClient extends AdminClient {
}
/**
- * If an authentication exception is encountered with connection to
any broker,
- * fail all pending requests.
- */
- private void handleAuthenticationException(long now, Map<Node,
List<Call>> callsToSend) {
- AuthenticationException authenticationException =
metadata.getAndClearAuthenticationException();
- if (authenticationException == null) {
- for (Node node : callsToSend.keySet()) {
- authenticationException =
client.authenticationException(node);
- if (authenticationException != null)
- break;
- }
- }
- if (authenticationException != null) {
- synchronized (this) {
- failCalls(now, newCalls, authenticationException);
- }
- for (List<Call> calls : callsToSend.values()) {
- failCalls(now, calls, authenticationException);
- }
- callsToSend.clear();
- }
- }
-
- private void failCalls(long now, List<Call> calls,
AuthenticationException authenticationException) {
- for (Call call : calls) {
- call.fail(now, authenticationException);
- }
- calls.clear();
- }
-
- /**
* Handle responses from the server.
*
* @param now The current time in milliseconds.
@@ -952,9 +943,14 @@ public class KafkaAdminClient extends AdminClient {
if (response.versionMismatch() != null) {
call.fail(now, response.versionMismatch());
} else if (response.wasDisconnected()) {
- call.fail(now, new DisconnectException(String.format(
- "Cancelled %s request with correlation id %s due to
node %s being disconnected",
- call.callName, correlationId,
response.destination())));
+ AuthenticationException authException =
client.authenticationException(call.curNode());
+ if (authException != null) {
+ call.fail(now, authException);
+ } else {
+ call.fail(now, new DisconnectException(String.format(
+ "Cancelled %s request with correlation id %s due
to node %s being disconnected",
+ call.callName, correlationId,
response.destination())));
+ }
} else {
try {
call.handleResponse(response.responseBody());
@@ -970,13 +966,41 @@ public class KafkaAdminClient extends AdminClient {
}
}
- private synchronized boolean threadShouldExit(long now, long
curHardShutdownTimeMs,
+ private boolean hasActiveExternalCalls(Collection<Call> calls) {
+ for (Call call : calls) {
+ if (!call.isInternal()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Return true if there are currently active external calls.
+ */
+ private boolean hasActiveExternalCalls(List<Call> pendingCalls,
Map<Node, List<Call>> callsToSend, Map<Integer, Call>
correlationIdToCalls) {
- if (newCalls.isEmpty() && callsToSend.isEmpty() &&
correlationIdToCalls.isEmpty()) {
+ if (hasActiveExternalCalls(pendingCalls)) {
+ return true;
+ }
+ for (List<Call> callList : callsToSend.values()) {
+ if (hasActiveExternalCalls(callList)) {
+ return true;
+ }
+ }
+ if (hasActiveExternalCalls(correlationIdToCalls.values())) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean threadShouldExit(long now, long curHardShutdownTimeMs,
List<Call> pendingCalls,
+ Map<Node, List<Call>> callsToSend, Map<Integer, Call>
correlationIdToCalls) {
+ if (!hasActiveExternalCalls(pendingCalls, callsToSend,
correlationIdToCalls)) {
log.trace("All work has been completed, and the I/O thread is
now exiting.");
return true;
}
- if (now > curHardShutdownTimeMs) {
+ if (now >= curHardShutdownTimeMs) {
log.info("Forcing a hard I/O thread shutdown. Requests in
progress will be aborted.");
return true;
}
@@ -986,38 +1010,46 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void run() {
- /*
+ /**
+ * Calls which have not yet been assigned to a node.
+ * Only accessed from this thread.
+ */
+ ArrayList<Call> pendingCalls = new ArrayList<>();
+
+ /**
* Maps nodes to calls that we want to send.
+ * Only accessed from this thread.
*/
Map<Node, List<Call>> callsToSend = new HashMap<>();
- /*
+ /**
* Maps node ID strings to calls that have been sent.
+ * Only accessed from this thread.
*/
Map<String, List<Call>> callsInFlight = new HashMap<>();
- /*
+ /**
* Maps correlation IDs to calls that have been sent.
+ * Only accessed from this thread.
*/
Map<Integer, Call> correlationIdToCalls = new HashMap<>();
- /*
- * The previous metadata version which wasn't usable, or null if
there is none.
- */
- Integer prevMetadataVersion = null;
-
long now = time.milliseconds();
log.trace("Thread starting");
while (true) {
+ // Copy newCalls into pendingCalls.
+ drainNewCalls(pendingCalls);
+
// Check if the AdminClient thread should shut down.
long curHardShutdownTimeMs = hardShutdownTimeMs.get();
if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
- threadShouldExit(now, curHardShutdownTimeMs,
callsToSend, correlationIdToCalls))
+ threadShouldExit(now, curHardShutdownTimeMs,
pendingCalls,
+ callsToSend, correlationIdToCalls))
break;
// Handle timeouts.
TimeoutProcessor timeoutProcessor =
timeoutProcessorFactory.create(now);
- timeoutNewCalls(timeoutProcessor);
+ timeoutPendingCalls(timeoutProcessor, pendingCalls);
timeoutCallsToSend(timeoutProcessor, callsToSend);
timeoutCallsInFlight(timeoutProcessor, callsInFlight);
@@ -1026,12 +1058,22 @@ public class KafkaAdminClient extends AdminClient {
pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs
- now);
}
- // Handle new calls and metadata update requests.
- prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
- if (prevMetadataVersion == null) {
- chooseNodesForNewCalls(now, callsToSend);
- pollTimeout = Math.min(pollTimeout,
- sendEligibleCalls(now, callsToSend,
correlationIdToCalls, callsInFlight));
+ // Choose nodes for our pending calls.
+ chooseNodesForPendingCalls(now, pendingCalls.iterator(),
callsToSend);
+ long metadataFetchDelayMs =
metadataManager.metadataFetchDelayMs(now);
+ if (metadataFetchDelayMs == 0) {
+ metadataManager.transitionToUpdatePending(now);
+ Call metadataCall = makeMetadataCall(now);
+ // Create a new metadata fetch call and add it to the end
of pendingCalls.
+ // Assign a node for just the new call (we handled the
other pending nodes above).
+ pendingCalls.add(metadataCall);
+ chooseNodesForPendingCalls(now,
pendingCalls.listIterator(pendingCalls.size() - 1),
+ callsToSend);
+ }
+ pollTimeout = Math.min(pollTimeout,
+ sendEligibleCalls(now, callsToSend, correlationIdToCalls,
callsInFlight));
+ if (metadataFetchDelayMs > 0) {
+ pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
}
// Wait for network responses.
@@ -1041,7 +1083,6 @@ public class KafkaAdminClient extends AdminClient {
// Update the current time and handle the latest responses.
now = time.milliseconds();
- handleAuthenticationException(now, callsToSend);
handleResponses(now, responses, callsInFlight,
correlationIdToCalls);
}
int numTimedOut = 0;
@@ -1051,10 +1092,13 @@ public class KafkaAdminClient extends AdminClient {
"The AdminClient thread has exited.");
newCalls = null;
}
+ numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
+ "The AdminClient thread has exited.");
+ numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
numTimedOut +=
timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
"The AdminClient thread has exited.");
if (numTimedOut > 0) {
- log.debug("Timed out {} remaining operations.", numTimedOut);
+ log.debug("Timed out {} remaining operation(s).", numTimedOut);
}
closeQuietly(client, "KafkaClient");
closeQuietly(metrics, "Metrics");
@@ -1106,6 +1150,40 @@ public class KafkaAdminClient extends AdminClient {
enqueue(call, now);
}
}
+
+ /**
+ * Create a new metadata call.
+ */
+ private Call makeMetadataCall(long now) {
+ return new Call(true, "fetchMetadata", calcDeadlineMs(now,
defaultTimeoutMs),
+ new MetadataUpdateNodeIdProvider()) {
+ @Override
+ public AbstractRequest.Builder createRequest(int timeoutMs) {
+ // Since this only requests node information, it's safe to
pass true
+ // for allowAutoTopicCreation (and it simplifies
communication with
+ // older brokers)
+ return new
MetadataRequest.Builder(Collections.<String>emptyList(), true);
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse)
abstractResponse;
+ metadataManager.update(response.cluster(),
time.milliseconds(), null);
+ }
+
+ @Override
+ public void handleFailure(Throwable e) {
+ if (e instanceof AuthenticationException) {
+ log.info("Unable to fetch cluster metadata from node
{} because of " +
+ "authentication error", curNode(), e);
+ metadataManager.update(Cluster.empty(),
time.milliseconds(), (AuthenticationException) e);
+ } else {
+ log.info("Unable to fetch cluster metadata from node
{}",
+ curNode(), e);
+ }
+ }
+ };
+ }
}
/**
@@ -1149,6 +1227,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
CreateTopicsResponse response = (CreateTopicsResponse)
abstractResponse;
+ // Check for controller change
+ for (ApiError error : response.errors().values()) {
+ if (error.error() == Errors.NOT_CONTROLLER) {
+ metadataManager.clearController();
+ metadataManager.requestUpdate();
+ throw error.exception();
+ }
+ }
// Handle server responses for particular topics.
for (Map.Entry<String, ApiError> entry :
response.errors().entrySet()) {
KafkaFutureImpl<Void> future =
topicFutures.get(entry.getKey());
@@ -1212,6 +1298,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
DeleteTopicsResponse response = (DeleteTopicsResponse)
abstractResponse;
+ // Check for controller change
+ for (Errors error : response.errors().values()) {
+ if (error == Errors.NOT_CONTROLLER) {
+ metadataManager.clearController();
+ metadataManager.requestUpdate();
+ throw error.exception();
+ }
+ }
// Handle server responses for particular topics.
for (Map.Entry<String, Errors> entry :
response.errors().entrySet()) {
KafkaFutureImpl<Void> future =
topicFutures.get(entry.getKey());
@@ -1982,6 +2076,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
CreatePartitionsResponse response = (CreatePartitionsResponse)
abstractResponse;
+ // Check for controller change
+ for (ApiError error : response.errors().values()) {
+ if (error.error() == Errors.NOT_CONTROLLER) {
+ metadataManager.clearController();
+ metadataManager.requestUpdate();
+ throw error.exception();
+ }
+ }
for (Map.Entry<String, ApiError> result :
response.errors().entrySet()) {
KafkaFutureImpl<Void> future =
futures.get(result.getKey());
if (result.getValue().isSuccess()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
new file mode 100644
index 0000000..63e7fc8
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internal;
+
+import org.apache.kafka.clients.MetadataUpdater;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Manages the metadata for KafkaAdminClient.
+ *
+ * This class is not thread-safe. It is only accessed from the AdminClient
+ * service thread (which also uses the NetworkClient).
+ */
+public class AdminMetadataManager {
+ private Logger log;
+
+ /**
+ * The timer.
+ */
+ private final Time time;
+
+ /**
+ * The minimum amount of time that we should wait between subsequent
+ * retries, when fetching metadata.
+ */
+ private final long refreshBackoffMs;
+
+ /**
+ * The minimum amount of time that we should wait before triggering an
+ * automatic metadata refresh.
+ */
+ private final long metadataExpireMs;
+
+ /**
+ * Used to update the NetworkClient metadata.
+ */
+ private final AdminMetadataUpdater updater;
+
+ /**
+ * The current metadata state.
+ */
+ private State state = State.QUIESCENT;
+
+ /**
+ * The time in wall-clock milliseconds when we last updated the metadata.
+ */
+ private long lastMetadataUpdateMs = 0;
+
+ /**
+ * The time in wall-clock milliseconds when we last attempted to fetch new
+ * metadata.
+ */
+ private long lastMetadataFetchAttemptMs = 0;
+
+ /**
+ * The current cluster information.
+ */
+ private Cluster cluster = Cluster.empty();
+
+ /**
+ * If we got an authorization exception when we last attempted to fetch
+ * metadata, this is it; null, otherwise.
+ */
+ private AuthenticationException authException = null;
+
+ public class AdminMetadataUpdater implements MetadataUpdater {
+ @Override
+ public List<Node> fetchNodes() {
+ return cluster.nodes();
+ }
+
+ @Override
+ public boolean isUpdateDue(long now) {
+ return false;
+ }
+
+ @Override
+ public long maybeUpdate(long now) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public void handleDisconnection(String destination) {
+ // Do nothing
+ }
+
+ @Override
+ public void handleAuthenticationFailure(AuthenticationException e) {
+ log.info("AdminMetadataManager got AuthenticationException", e);
+ update(Cluster.empty(), time.milliseconds(), e);
+ }
+
+ @Override
+ public void handleCompletedMetadataResponse(RequestHeader
requestHeader, long now, MetadataResponse metadataResponse) {
+ // Do nothing
+ }
+
+ @Override
+ public void requestUpdate() {
+ // Do nothing
+ }
+ }
+
+ /**
+ * The current AdminMetadataManager state.
+ */
+ enum State {
+ QUIESCENT,
+ UPDATE_REQUESTED,
+ UPDATE_PENDING;
+ }
+
+ public AdminMetadataManager(LogContext logContext, Time time, long
refreshBackoffMs,
+ long metadataExpireMs) {
+ this.log = logContext.logger(AdminMetadataManager.class);
+ this.time = time;
+ this.refreshBackoffMs = refreshBackoffMs;
+ this.metadataExpireMs = metadataExpireMs;
+ this.updater = new AdminMetadataUpdater();
+ }
+
+ public AdminMetadataUpdater updater() {
+ return updater;
+ }
+
+ public boolean isReady() {
+ if (authException != null) {
+ log.debug("Metadata is not usable: failed to get metadata.",
authException);
+ throw authException;
+ }
+ if (cluster.nodes().isEmpty()) {
+ log.trace("Metadata is not ready: bootstrap nodes have not been " +
+ "initialized yet.");
+ return false;
+ }
+ if (cluster.isBootstrapConfigured()) {
+ log.trace("Metadata is not ready: we have not fetched metadata
from " +
+ "the bootstrap nodes yet.");
+ return false;
+ }
+ log.trace("Metadata is ready to use.");
+ return true;
+ }
+
+ public Node controller() {
+ return cluster.controller();
+ }
+
+ public Node nodeById(int nodeId) {
+ return cluster.nodeById(nodeId);
+ }
+
+ public void requestUpdate() {
+ if (state == State.QUIESCENT) {
+ state = State.UPDATE_REQUESTED;
+ log.debug("Requesting metadata update.");
+ }
+ }
+
+ public void clearController() {
+ if (cluster.controller() != null) {
+ log.trace("Clearing cached controller node {}.",
cluster.controller());
+ this.cluster = new Cluster(cluster.clusterResource().clusterId(),
+ cluster.nodes(),
+ Collections.<PartitionInfo>emptySet(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ null);
+ }
+ }
+
+ /**
+ * Determine if the AdminClient should fetch new metadata.
+ */
+ public long metadataFetchDelayMs(long now) {
+ switch (state) {
+ case QUIESCENT:
+ // Calculate the time remaining until the next periodic update.
+ // We want to avoid making many metadata requests in a short
amount of time,
+ // so there is a metadata refresh backoff period.
+ long timeSinceUpdate = now - lastMetadataUpdateMs;
+ long timeRemainingUntilUpdate = metadataExpireMs -
timeSinceUpdate;
+ long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
+ long timeRemainingUntilAttempt = refreshBackoffMs -
timeSinceAttempt;
+ return Math.max(Math.max(0L, timeRemainingUntilUpdate),
timeRemainingUntilAttempt);
+ case UPDATE_REQUESTED:
+ // An update has been explicitly requested. Do it as soon as
possible.
+ return 0;
+ default:
+ // An update is already pending, so we don't need to initiate
another one.
+ return Long.MAX_VALUE;
+ }
+ }
+
+ /**
+ * Transition into the UPDATE_PENDING state. Updates
lastMetadataFetchAttemptMs.
+ */
+ public void transitionToUpdatePending(long now) {
+ this.state = State.UPDATE_PENDING;
+ this.lastMetadataFetchAttemptMs = now;
+ }
+
+ /**
+ * Receive new metadata, and transition into the QUIESCENT state.
+ * Updates lastMetadataUpdateMs, cluster, and authException.
+ */
+ public void update(Cluster cluster, long now, AuthenticationException
authException) {
+ if (cluster.isBootstrapConfigured()) {
+ log.debug("Setting bootstrap cluster metadata {}.", cluster);
+ } else {
+ log.debug("Received cluster metadata {}{}.",
+ cluster, authException == null ? "" : " with authentication
exception.");
+ }
+ this.state = State.QUIESCENT;
+ this.lastMetadataUpdateMs = now;
+ this.authException = authException;
+ if (!cluster.nodes().isEmpty()) {
+ this.cluster = cluster;
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0c59f33..ccbaa30 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -274,7 +274,8 @@ public final class Cluster {
@Override
public String toString() {
- return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " +
this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
+ return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " +
this.nodes +
+ ", partitions = " + this.partitionsByTopicPartition.values() + ",
controller = " + controller + ")";
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 37b43e5..7a8ba1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -81,6 +81,7 @@ public class MockClient implements KafkaClient {
private Node node = null;
private final Set<String> ready = new HashSet<>();
private final Map<Node, Long> blackedOut = new HashMap<>();
+ private final Map<Node, Long> pendingAuthenticationErrors = new
HashMap<>();
private final Map<Node, AuthenticationException> authenticationErrors =
new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from
a different thread
private final Queue<ClientRequest> requests = new
ConcurrentLinkedDeque<>();
@@ -128,11 +129,16 @@ public class MockClient implements KafkaClient {
}
public void authenticationFailed(Node node, long duration) {
+ pendingAuthenticationErrors.remove(node);
authenticationErrors.put(node, (AuthenticationException)
Errors.SASL_AUTHENTICATION_FAILED.exception());
disconnect(node.idString());
blackout(node, duration);
}
+ public void createPendingAuthenticationError(Node node, long blackoutMs) {
+ pendingAuthenticationErrors.put(node, blackoutMs);
+ }
+
private boolean isBlackedOut(Node node) {
if (blackedOut.containsKey(node)) {
long expiration = blackedOut.get(node);
@@ -174,6 +180,26 @@ public class MockClient implements KafkaClient {
@Override
public void send(ClientRequest request, long now) {
+ // Check if the request is directed to a node with a pending
authentication error.
+ for (Iterator<Map.Entry<Node, Long>> authErrorIter =
+ pendingAuthenticationErrors.entrySet().iterator();
authErrorIter.hasNext(); ) {
+ Map.Entry<Node, Long> entry = authErrorIter.next();
+ Node node = entry.getKey();
+ long blackoutMs = entry.getValue();
+ if (node.idString().equals(request.destination())) {
+ authErrorIter.remove();
+ // Set up a disconnected ClientResponse and create an
authentication error
+ // for the affected node.
+ authenticationFailed(node, blackoutMs);
+ AbstractRequest.Builder<?> builder = request.requestBuilder();
+ short version =
nodeApiVersions.latestUsableVersion(request.apiKey(),
builder.oldestAllowedVersion(),
+ builder.latestAllowedVersion());
+ ClientResponse resp = new
ClientResponse(request.makeHeader(version), request.callback(),
request.destination(),
+ request.createdTimeMs(), time.milliseconds(), true, null,
null);
+ responses.add(resp);
+ return;
+ }
+ }
Iterator<FutureResponse> iterator = futureResponses.iterator();
while (iterator.hasNext()) {
FutureResponse futureResp = iterator.next();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 10281fb..f862c14 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -16,11 +16,12 @@
*/
package org.apache.kafka.clients.admin;
-import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -50,13 +51,25 @@ public class AdminClientUnitTestEnv implements
AutoCloseable {
}
public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String,
Object> config) {
+ this(newMockClient(time, cluster), time, cluster, config);
+ }
+
+ private static MockClient newMockClient(Time time, Cluster cluster) {
+ MockClient mockClient = new MockClient(time);
+ mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ Collections.<MetadataResponse.TopicMetadata>emptyList()));
+ return mockClient;
+ }
+
+ public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster
cluster,
+ Map<String, Object> config) {
this.time = time;
this.cluster = cluster;
AdminClientConfig adminClientConfig = new AdminClientConfig(config);
- Metadata metadata = new
Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
- this.mockClient = new MockClient(time, metadata);
- this.adminClient = KafkaAdminClient.createInternal(adminClientConfig,
mockClient, metadata, time);
+ this.mockClient = mockClient;
+ this.adminClient = KafkaAdminClient.createInternal(adminClientConfig,
mockClient, time);
}
public Time time() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0debed3..cdd9a28 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -65,6 +67,8 @@ import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
@@ -86,6 +90,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.requests.ResourceType.BROKER;
@@ -169,15 +174,18 @@ public class KafkaAdminClientTest {
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
"myCustomId")));
}
- private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+ private static Cluster mockCluster(int controllerIndex) {
HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
nodes.put(1, new Node(1, "localhost", 8122));
nodes.put(2, new Node(2, "localhost", 8123));
- Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+ return new Cluster("mockClusterId", nodes.values(),
Collections.<PartitionInfo>emptySet(),
Collections.<String>emptySet(),
- Collections.<String>emptySet(), nodes.get(0));
- return new AdminClientUnitTestEnv(cluster, configVals);
+ Collections.<String>emptySet(), nodes.get(controllerIndex));
+ }
+
+ private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+ return new AdminClientUnitTestEnv(mockCluster(0), configVals);
}
@Test
@@ -204,7 +212,13 @@ public class KafkaAdminClientTest {
*/
@Test
public void testTimeoutWithoutMetadata() throws Exception {
- try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+ Cluster cluster = mockCluster(0);
+ MockClient mockClient = new MockClient(Time.SYSTEM);
+ try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockClient,
+ Time.SYSTEM,
+ cluster,
+ newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:8121",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNode(new Node(0, "localhost", 8121));
env.kafkaClient().prepareResponse(new
CreateTopicsResponse(Collections.singletonMap("myTopic", new
ApiError(Errors.NONE, ""))));
@@ -215,6 +229,30 @@ public class KafkaAdminClientTest {
}
}
+ /**
+ * Test that we propagate exceptions encountered when fetching metadata.
+ */
+ @Test
+ public void testPropagatedMetadataFetchException() throws Exception {
+ Cluster cluster = mockCluster(0);
+ MockClient mockClient = new MockClient(Time.SYSTEM);
+ mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+ TimeUnit.DAYS.toMillis(1));
+ try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockClient,
+ Time.SYSTEM,
+ cluster,
+ newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:8121",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().setNode(env.cluster().nodeById(0));
+ env.kafkaClient().prepareResponse(new
CreateTopicsResponse(Collections.singletonMap("myTopic", new
ApiError(Errors.NONE, ""))));
+ KafkaFuture<Void> future = env.adminClient().createTopics(
+ Collections.singleton(new NewTopic("myTopic",
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ new CreateTopicsOptions().timeoutMs(1000)).all();
+ assertFutureError(future, SaslAuthenticationException.class);
+ }
+ }
+
@Test
public void testCreateTopics() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -230,6 +268,30 @@ public class KafkaAdminClientTest {
}
@Test
+ public void testCreateTopicsHandleNotControllerException() throws
Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareMetadataUpdate(mockCluster(0),
Collections.<String>emptySet());
+ env.kafkaClient().prepareMetadataUpdate(mockCluster(1),
Collections.<String>emptySet());
+ env.kafkaClient().setNode(env.cluster().nodeById(0));
+ env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+ Collections.singletonMap("myTopic", new
ApiError(Errors.NOT_CONTROLLER, ""))),
+ env.cluster().nodeById(0));
+ env.kafkaClient().prepareResponse(new
MetadataResponse(env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 1,
+ Collections.<MetadataResponse.TopicMetadata>emptyList()));
+ env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+ Collections.singletonMap("myTopic", new
ApiError(Errors.NONE, ""))),
+ env.cluster().nodeById(1));
+ KafkaFuture<Void> future = env.adminClient().createTopics(
+ Collections.singleton(new NewTopic("myTopic",
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ new CreateTopicsOptions().timeoutMs(10000)).all();
+ future.get();
+ }
+ }
+
+ @Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -266,33 +328,31 @@ public class KafkaAdminClientTest {
}
@Test
- public void
testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws
Exception {
- AdminClientUnitTestEnv env = mockClientEnv();
- Node node = env.cluster().controller();
- env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(node);
- env.kafkaClient().authenticationFailed(node, 300);
-
- callAdminClientApisAndExpectAnAuthenticationError(env);
-
- // wait less than the blackout period, the connection should fail and
the authentication error should remain
- env.time().sleep(30);
- assertTrue(env.kafkaClient().connectionFailed(node));
- callAdminClientApisAndExpectAnAuthenticationError(env);
-
- env.close();
+ public void testAdminClientApisAuthenticationFailure() throws Exception {
+ Cluster cluster = mockCluster(0);
+ MockClient mockClient = new MockClient(Time.SYSTEM);
+ mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+ TimeUnit.DAYS.toMillis(1));
+ try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockClient,
+ Time.SYSTEM,
+ cluster,
+ newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:8121",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().setNode(env.cluster().controller());
+ callAdminClientApisAndExpectAnAuthenticationError(env);
+ }
}
private void
callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env)
throws InterruptedException {
- env.kafkaClient().prepareMetadataUpdate(env.cluster(),
Collections.<String>emptySet());
-
try {
env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic",
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
new CreateTopicsOptions().timeoutMs(10000)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
try {
@@ -302,35 +362,40 @@ public class KafkaAdminClientTest {
env.adminClient().createPartitions(counts).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().describeAcls(FILTER1).values().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().describeConfigs(Collections.singleton(new
ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
- assertTrue("Expected only an authentication error.", e.getCause()
instanceof AuthenticationException);
+ assertTrue("Expected an authentication error, but got " +
Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
}
}
@@ -918,7 +983,7 @@ public class KafkaAdminClientTest {
}
boolean callHasExpired(KafkaAdminClient.Call call) {
- if (shouldInjectFailure()) {
+ if ((!call.isInternal()) && shouldInjectFailure()) {
log.debug("Injecting timeout for {}.", call);
return true;
} else {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index cda6879..5b1e155 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -52,7 +52,6 @@ public class TopicAdminTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().prepareMetadataUpdate(env.cluster(),
Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
@@ -65,7 +64,7 @@ public class TopicAdminTest {
final NewTopic newTopic =
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
- env.kafkaClient().prepareMetadataUpdate(env.cluster(),
Collections.<String>emptySet());
+ env.kafkaClient().setNode(cluster.nodes().iterator().next());
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
--
To stop receiving notification emails like this one, please contact
[email protected].