http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java new file mode 100644 index 0000000..85db168 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -0,0 +1,920 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p> + * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + * </p> + * <p> + * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized + * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current + * configuration state is (where it is in the configuration log). The group coordinator selects one member to take + * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment + * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose + * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once + * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker. + * </p> + * <p> + * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment + * to select a leader for this generation of the group who is responsible for other tasks that can only be performed + * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks, + * (and therefore, also for creating, destroy, and scaling up/down connectors). + * </p> + */ +public class DistributedHerder implements Herder, Runnable { + private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); + + private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; + + private final Worker worker; + private final KafkaConfigStorage configStorage; + private ClusterConfigState configState; + private final Time time; + + private final int workerSyncTimeoutMs; + private final int workerUnsyncBackoffMs; + + private final WorkerGroupMember member; + private final AtomicBoolean stopping; + private final CountDownLatch stopLatch = new CountDownLatch(1); + + // Track enough information about the current membership state to be able to determine which requests via the API + // and the from other nodes are safe to process + private boolean rebalanceResolved; + private ConnectProtocol.Assignment assignment; + + // To handle most external requests, like creating or destroying a connector, we can use a generic request where + // the caller specifies all the code that should be executed. + private final Queue<HerderRequest> requests = new PriorityQueue<>(); + // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when + // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). + private Set<String> connectorConfigUpdates = new HashSet<>(); + private boolean needsReconfigRebalance; + + private final ExecutorService forwardRequestExecutor; + + public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) { + this(config, worker, null, null, restUrl, new SystemTime()); + } + + // public for testing + public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) { + this.worker = worker; + if (configStorage != null) { + // For testing. Assume configuration has already been performed + this.configStorage = configStorage; + } else { + this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback()); + this.configStorage.configure(config.originals()); + } + configState = ClusterConfigState.EMPTY; + this.time = time; + + this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); + this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); + + this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener()); + stopping = new AtomicBoolean(false); + + rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks + needsReconfigRebalance = false; + + forwardRequestExecutor = Executors.newSingleThreadExecutor(); + } + + @Override + public void start() { + Thread thread = new Thread(this, "DistributedHerder"); + thread.start(); + } + + public void run() { + try { + log.info("Herder starting"); + + configStorage.start(); + + log.info("Herder started"); + + while (!stopping.get()) { + tick(); + } + + halt(); + + log.info("Herder stopped"); + } catch (Throwable t) { + log.error("Uncaught exception in herder work thread, exiting: ", t); + stopLatch.countDown(); + System.exit(1); + } finally { + stopLatch.countDown(); + } + } + + // public for testing + public void tick() { + // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events + // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is + // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly + // blocking up this thread (especially those in callbacks due to rebalance events). + + try { + member.ensureActive(); + // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin + if (!handleRebalanceCompleted()) return; + } catch (WakeupException e) { + // May be due to a request from another thread, or might be stopping. If the latter, we need to check the + // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests + // unless we're in the group. + return; + } + + // Process any external requests + final long now = time.milliseconds(); + long nextRequestTimeoutMs = Long.MAX_VALUE; + while (true) { + final HerderRequest next; + synchronized (this) { + next = requests.peek(); + if (next == null) { + break; + } else if (now >= next.at) { + requests.poll(); + } else { + nextRequestTimeoutMs = next.at - now; + break; + } + } + + try { + next.action().call(); + next.callback().onCompletion(null, null); + } catch (Throwable t) { + next.callback().onCompletion(t, null); + } + } + + // Process any configuration updates + Set<String> connectorConfigUpdatesCopy = null; + synchronized (this) { + if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) { + // Connector reconfigs only need local updates since there is no coordination between workers required. + // However, if connectors were added or removed, work needs to be rebalanced since we have more work + // items to distribute among workers. + ClusterConfigState newConfigState = configStorage.snapshot(); + if (!newConfigState.connectors().equals(configState.connectors())) + needsReconfigRebalance = true; + configState = newConfigState; + if (needsReconfigRebalance) { + // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart + // this loop, which will then ensure the rebalance occurs without any other requests being + // processed until it completes. + member.requestRejoin(); + // Any connector config updates will be addressed during the rebalance too + connectorConfigUpdates.clear(); + needsReconfigRebalance = false; + return; + } else if (!connectorConfigUpdates.isEmpty()) { + // We can't start/stop while locked since starting connectors can cause task updates that will + // require writing configs, which in turn make callbacks into this class from another thread that + // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process + // the updates after unlocking. + connectorConfigUpdatesCopy = connectorConfigUpdates; + connectorConfigUpdates = new HashSet<>(); + } + } + } + if (connectorConfigUpdatesCopy != null) { + // If we only have connector config updates, we can just bounce the updated connectors that are + // currently assigned to this worker. + Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors()); + for (String connectorName : connectorConfigUpdatesCopy) { + if (!localConnectors.contains(connectorName)) + continue; + boolean remains = configState.connectors().contains(connectorName); + log.info("Handling connector-only config update by {} connector {}", + remains ? "restarting" : "stopping", connectorName); + worker.stopConnector(connectorName); + // The update may be a deletion, so verify we actually need to restart the connector + if (remains) + startConnector(connectorName); + } + } + + // Let the group take any actions it needs to + try { + member.poll(nextRequestTimeoutMs); + // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin + if (!handleRebalanceCompleted()) return; + } catch (WakeupException e) { // FIXME should not be WakeupException + // Ignore. Just indicates we need to check the exit flag, for requested actions, etc. + } + } + + // public for testing + public void halt() { + synchronized (this) { + // Clean up any connectors and tasks that are still running. + log.info("Stopping connectors and tasks that are still assigned to this worker."); + for (String connName : new HashSet<>(worker.connectorNames())) { + try { + worker.stopConnector(connName); + } catch (Throwable t) { + log.error("Failed to shut down connector " + connName, t); + } + } + for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) { + try { + worker.stopTask(taskId); + } catch (Throwable t) { + log.error("Failed to shut down task " + taskId, t); + } + } + + member.stop(); + + // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason + // for their failure + while (!requests.isEmpty()) { + HerderRequest request = requests.poll(); + request.callback().onCompletion(new ConnectException("Worker is shutting down"), null); + } + + if (configStorage != null) + configStorage.stop(); + } + } + + @Override + public void stop() { + log.info("Herder stopping"); + + stopping.set(true); + member.wakeup(); + while (stopLatch.getCount() > 0) { + try { + stopLatch.await(); + } catch (InterruptedException e) { + // ignore, should not happen + } + } + + + forwardRequestExecutor.shutdown(); + try { + if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS)) + forwardRequestExecutor.shutdownNow(); + } catch (InterruptedException e) { + // ignore + } + + log.info("Herder stopped"); + } + + @Override + public synchronized void connectors(final Callback<Collection<String>> callback) { + log.trace("Submitting connector listing request"); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!checkConfigSynced(callback)) + return null; + + callback.onCompletion(null, configState.connectors()); + return null; + } + }, + forwardErrorCallback(callback) + ); + } + + @Override + public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) { + log.trace("Submitting connector info request {}", connName); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!checkConfigSynced(callback)) + return null; + + if (!configState.connectors().contains(connName)) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); + } else { + callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), configState.tasks(connName))); + } + return null; + } + }, + forwardErrorCallback(callback) + ); + } + + @Override + public void connectorConfig(String connName, final Callback<Map<String, String>> callback) { + // Subset of connectorInfo, so piggy back on that implementation + log.trace("Submitting connector config read request {}", connName); + connectorInfo(connName, new Callback<ConnectorInfo>() { + @Override + public void onCompletion(Throwable error, ConnectorInfo result) { + if (error != null) + callback.onCompletion(error, null); + else + callback.onCompletion(null, result.config()); + } + }); + } + + @Override + public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace, + final Callback<Created<ConnectorInfo>> callback) { + final Map<String, String> connConfig; + if (config == null) { + connConfig = null; + } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) { + connConfig = new HashMap<>(config); + connConfig.put(ConnectorConfig.NAME_CONFIG, connName); + } else { + connConfig = config; + } + + log.trace("Submitting connector config write request {}", connName); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + log.trace("Handling connector config request {}", connName); + if (!isLeader()) { + callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); + return null; + } + + boolean exists = configState.connectors().contains(connName); + if (!allowReplace && exists) { + callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); + return null; + } + + if (connConfig == null && !exists) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); + return null; + } + + log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); + configStorage.putConnectorConfig(connName, connConfig); + + boolean created = !exists && connConfig != null; + // Note that we use the updated connector config despite the fact that we don't have an updated + // snapshot yet. The existing task info should still be accurate. + ConnectorInfo info = connConfig == null ? null : + new ConnectorInfo(connName, connConfig, configState.tasks(connName)); + callback.onCompletion(null, new Created<>(created, info)); + + return null; + } + }, + forwardErrorCallback(callback) + ); + } + + @Override + public synchronized void requestTaskReconfiguration(final String connName) { + log.trace("Submitting connector task reconfiguration request {}", connName); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + reconfigureConnectorTasksWithRetry(connName); + return null; + } + }, + new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + log.error("Unexpected error during task reconfiguration: ", error); + log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); + } + } + ); + } + + @Override + public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) { + log.trace("Submitting get task configuration request {}", connName); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!checkConfigSynced(callback)) + return null; + + if (!configState.connectors().contains(connName)) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); + } else { + List<TaskInfo> result = new ArrayList<>(); + for (int i = 0; i < configState.taskCount(connName); i++) { + ConnectorTaskId id = new ConnectorTaskId(connName, i); + result.add(new TaskInfo(id, configState.taskConfig(id))); + } + callback.onCompletion(null, result); + } + return null; + } + }, + forwardErrorCallback(callback) + ); + } + + @Override + public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) { + log.trace("Submitting put task configuration request {}", connName); + + addRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!isLeader()) + callback.onCompletion(new NotLeaderException("Only the leader may write task configurations.", leaderUrl()), null); + else if (!configState.connectors().contains(connName)) + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); + else { + configStorage.putTaskConfigs(taskConfigListAsMap(connName, configs)); + callback.onCompletion(null, null); + } + return null; + } + }, + forwardErrorCallback(callback) + ); + } + + + // Should only be called from work thread, so synchronization should not be needed + private boolean isLeader() { + return assignment != null && member.memberId().equals(assignment.leader()); + } + + /** + * Get the URL for the leader's REST interface, or null if we do not have the leader's URL yet. + */ + private String leaderUrl() { + if (assignment == null) + return null; + return assignment.leaderUrl(); + } + + /** + * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting + * this node into sync and its work started. Since + * + * @return false if we couldn't finish + */ + private boolean handleRebalanceCompleted() { + if (this.rebalanceResolved) + return true; + + // We need to handle a variety of cases after a rebalance: + // 1. Assignment failed + // 1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before + // even attempting to. If we can't we should drop out of the group because we will block everyone from making + // progress. We can backoff and try rejoining later. + // 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately, + // otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready. + // 2. Assignment succeeded. + // 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work. + // 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll + // be kicked out of the group anyway due to lack of heartbeats. + + boolean needsReadToEnd = false; + long syncConfigsTimeoutMs = Long.MAX_VALUE; + boolean needsRejoin = false; + if (assignment.failed()) { + needsRejoin = true; + if (isLeader()) { + log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); + needsReadToEnd = true; + syncConfigsTimeoutMs = workerSyncTimeoutMs; + } else if (configState.offset() < assignment.offset()) { + log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); + needsReadToEnd = true; + } else { + log.warn("Join group completed, but assignment failed. We were up to date, so just retrying."); + } + } else { + if (configState.offset() < assignment.offset()) { + log.warn("Catching up to assignment's config offset."); + needsReadToEnd = true; + } + } + + if (needsReadToEnd) { + // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if + // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which + // case we've waited a long time and should have already left the group OR the timeout should have been + // very long and not having finished also indicates we've waited longer than the session timeout. + if (!readConfigToEnd(syncConfigsTimeoutMs)) + needsRejoin = true; + } + + if (needsRejoin) { + member.requestRejoin(); + return false; + } + + // Should still validate that they match since we may have gone *past* the required offset, in which case we + // should *not* start any tasks and rejoin + if (configState.offset() != assignment.offset()) { + log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset()); + member.requestRejoin(); + return false; + } + + startWork(); + + // We only mark this as resolved once we've actually started work, which allows us to correctly track whether + // what work is currently active and running. If we bail early, the main tick loop + having requested rejoin + // guarantees we'll attempt to rejoin before executing this method again. + rebalanceResolved = true; + return true; + } + + /** + * Try to read to the end of the config log within the given timeout + * @param timeoutMs maximum time to wait to sync to the end of the log + * @return true if successful, false if timed out + */ + private boolean readConfigToEnd(long timeoutMs) { + log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); + try { + configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS); + configState = configStorage.snapshot(); + log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset()); + return true; + } catch (TimeoutException e) { + log.warn("Didn't reach end of config log quickly enough", e); + // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this + // backoff since it'll be longer than the session timeout + if (isLeader()) + backoff(workerUnsyncBackoffMs); + return false; + } catch (InterruptedException | ExecutionException e) { + throw new ConnectException("Error trying to catch up after assignment", e); + } + } + + private void backoff(long ms) { + Utils.sleep(ms); + } + + private void startWork() { + // Start assigned connectors and tasks + log.info("Starting connectors and tasks using config offset {}", assignment.offset()); + for (String connectorName : assignment.connectors()) { + try { + startConnector(connectorName); + } catch (ConfigException e) { + log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " + + "configuration. This connector will not execute until reconfigured.", e); + } + } + for (ConnectorTaskId taskId : assignment.tasks()) { + try { + log.info("Starting task {}", taskId); + Map<String, String> configs = configState.taskConfig(taskId); + TaskConfig taskConfig = new TaskConfig(configs); + worker.addTask(taskId, taskConfig); + } catch (ConfigException e) { + log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " + + "configuration. This task will not execute until reconfigured.", e); + } + } + log.info("Finished starting connectors and tasks"); + } + + // Helper for starting a connector with the given name, which will extract & parse the config, generate connector + // context and add to the worker. This needs to be called from within the main worker thread for this herder. + private void startConnector(String connectorName) { + log.info("Starting connector {}", connectorName); + Map<String, String> configs = configState.connectorConfig(connectorName); + ConnectorConfig connConfig = new ConnectorConfig(configs); + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName); + worker.addConnector(connConfig, ctx); + + // Immediately request configuration since this could be a brand new connector. However, also only update those + // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is + // just restoring an existing connector. + reconfigureConnectorTasksWithRetry(connName); + } + + private void reconfigureConnectorTasksWithRetry(final String connName) { + reconfigureConnector(connName, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + // If we encountered an error, we don't have much choice but to just retry. If we don't, we could get + // stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore + // never makes progress. The retry has to run through a HerderRequest since this callback could be happening + // from the HTTP request forwarding thread. + if (error != null) { + log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error); + addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS, + new Callable<Void>() { + @Override + public Void call() throws Exception { + reconfigureConnectorTasksWithRetry(connName); + return null; + } + }, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + log.error("Unexpected error during connector task reconfiguration: ", error); + log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); + } + } + ); + } + } + }); + } + + // Updates configurations for a connector by requesting them from the connector, filling in parameters provided + // by the system, then checks whether any configs have actually changed before submitting the new configs to storage + private void reconfigureConnector(final String connName, final Callback<Void> cb) { + try { + Map<String, String> configs = configState.connectorConfig(connName); + ConnectorConfig connConfig = new ConnectorConfig(configs); + + List<String> sinkTopics = null; + if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) + sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); + + final List<Map<String, String>> taskProps + = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); + boolean changed = false; + int currentNumTasks = configState.taskCount(connName); + if (taskProps.size() != currentNumTasks) { + log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size()); + changed = true; + } else { + int index = 0; + for (Map<String, String> taskConfig : taskProps) { + if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) { + log.debug("Change in task configurations, writing updated task configurations"); + changed = true; + break; + } + index++; + } + } + if (changed) { + if (isLeader()) { + configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps)); + cb.onCompletion(null, null); + } else { + // We cannot forward the request on the same thread because this reconfiguration can happen in as a + // result of . If we blocked + forwardRequestExecutor.submit(new Runnable() { + @Override + public void run() { + try { + String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks"); + RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); + cb.onCompletion(null, null); + } catch (ConnectException e) { + log.error("Request to leader to reconfigure connector tasks failed", e); + cb.onCompletion(e, null); + } + } + }); + } + } + } catch (Throwable t) { + cb.onCompletion(t, null); + } + } + + // Common handling for requests that get config data. Checks if we are in sync with the current config, which allows + // us to answer requests directly. If we are not, handles invoking the callback with the appropriate error. + private boolean checkConfigSynced(Callback<?> callback) { + if (assignment == null || configState.offset() != assignment.offset()) { + if (!isLeader()) + callback.onCompletion(new NotLeaderException("Cannot get config data because config is not in sync and this is not the leader", leaderUrl()), null); + else + callback.onCompletion(new ConnectException("Cannot get config data because this is the leader node, but it does not have the most up to date configs"), null); + return false; + } + return true; + } + + private void addRequest(Callable<Void> action, Callback<Void> callback) { + addRequest(0, action, callback); + } + + private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { + HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback); + requests.add(req); + if (requests.peek() == req) + member.wakeup(); + } + + private class HerderRequest implements Comparable<HerderRequest> { + private final long at; + private final Callable<Void> action; + private final Callback<Void> callback; + + public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) { + this.at = at; + this.action = action; + this.callback = callback; + } + + public Callable<Void> action() { + return action; + } + + public Callback<Void> callback() { + return callback; + } + + @Override + public int compareTo(HerderRequest o) { + return Long.compare(at, o.at); + } + } + + private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) { + return new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + if (error != null) + callback.onCompletion(error, null); + } + }; + }; + + + // Config callbacks are triggered from the KafkaConfigStorage thread + private Callback<String> connectorConfigCallback() { + return new Callback<String>() { + @Override + public void onCompletion(Throwable error, String connector) { + log.info("Connector {} config updated", connector); + // Stage the update and wake up the work thread. Connector config *changes* only need the one connector + // to be bounced. However, this callback may also indicate a connector *addition*, which does require + // a rebalance, so we need to be careful about what operation we request. + synchronized (DistributedHerder.this) { + connectorConfigUpdates.add(connector); + } + member.wakeup(); + } + }; + } + + private Callback<List<ConnectorTaskId>> taskConfigCallback() { + return new Callback<List<ConnectorTaskId>>() { + @Override + public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) { + log.info("Tasks {} configs updated", tasks); + // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs + // always need a rebalance to ensure offsets get committed. + // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task + // connectors clearly don't need any coordination. + synchronized (DistributedHerder.this) { + needsReconfigRebalance = true; + } + member.wakeup(); + } + }; + } + + // Rebalances are triggered internally from the group member, so these are always executed in the work thread. + private WorkerRebalanceListener rebalanceListener() { + return new WorkerRebalanceListener() { + @Override + public void onAssigned(ConnectProtocol.Assignment assignment) { + // This callback just logs the info and saves it. The actual response is handled in the main loop, which + // ensures the group member's logic for rebalancing can complete, potentially long-running steps to + // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other + // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the + // assigned tasks). + log.info("Joined group and got assignment: {}", assignment); + synchronized (DistributedHerder.this) { + DistributedHerder.this.assignment = assignment; + rebalanceResolved = false; + } + // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then + // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the + // main thread. + member.wakeup(); + } + + @Override + public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { + log.info("Rebalance started"); + + // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance, + // it is still important to have a leader that can write configs, offsets, etc. + + if (rebalanceResolved) { + // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of + // them to finish + // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from + // this worker. Instead, we can let them continue to run but buffer any update requests (which should be + // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of + // unnecessary repeated connections to the source/sink system. + for (String connectorName : connectors) + worker.stopConnector(connectorName); + // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of + // stopping them then state could continue to be reused when the task remains on this worker. For example, + // this would avoid having to close a connection and then reopen it when the task is assigned back to this + // worker again. + for (ConnectorTaskId taskId : tasks) + worker.stopTask(taskId); + + log.info("Finished stopping tasks in preparation for rebalance"); + } else { + log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks"); + } + + } + }; + } + + + private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) { + int index = 0; + Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>(); + for (Map<String, String> taskConfigMap : configs) { + ConnectorTaskId taskId = new ConnectorTaskId(connName, index); + result.put(taskId, taskConfigMap); + index++; + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java new file mode 100644 index 0000000..5f94b53 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.connect.errors.ConnectException; + +/** + * Indicates an operation was not permitted because it can only be performed on the leader and this worker is not currently + * the leader. + */ +public class NotLeaderException extends ConnectException { + private final String leaderUrl; + + public NotLeaderException(String msg, String leaderUrl) { + super(msg); + this.leaderUrl = leaderUrl; + } + + public NotLeaderException(String msg, String leaderUrl, Throwable throwable) { + super(msg, throwable); + this.leaderUrl = leaderUrl; + } + + public NotLeaderException(String leaderUrl, Throwable throwable) { + super(throwable); + this.leaderUrl = leaderUrl; + } + + public String leaderUrl() { + return leaderUrl; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java new file mode 100644 index 0000000..082e235 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.CircularIterator; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments + * to workers. + */ +public final class WorkerCoordinator extends AbstractCoordinator implements Closeable { + private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class); + + // Currently doesn't support multiple task assignment strategies, so we just fill in a default value + public static final String DEFAULT_SUBPROTOCOL = "default"; + + private final String restUrl; + private final KafkaConfigStorage configStorage; + private ConnectProtocol.Assignment assignmentSnapshot; + private final WorkerCoordinatorMetrics sensors; + private ClusterConfigState configSnapshot; + private final WorkerRebalanceListener listener; + + private boolean rejoinRequested; + + /** + * Initialize the coordination manager. + */ + public WorkerCoordinator(ConsumerNetworkClient client, + String groupId, + int sessionTimeoutMs, + int heartbeatIntervalMs, + Metrics metrics, + String metricGrpPrefix, + Map<String, String> metricTags, + Time time, + long requestTimeoutMs, + long retryBackoffMs, + String restUrl, + KafkaConfigStorage configStorage, + WorkerRebalanceListener listener) { + super(client, + groupId, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + metricTags, + time, + retryBackoffMs); + this.restUrl = restUrl; + this.configStorage = configStorage; + this.assignmentSnapshot = null; + this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + this.listener = listener; + this.rejoinRequested = false; + } + + public void requestRejoin() { + rejoinRequested = true; + } + + @Override + public String protocolType() { + return "connect"; + } + + @Override + public LinkedHashMap<String, ByteBuffer> metadata() { + LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>(); + configSnapshot = configStorage.snapshot(); + ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset()); + metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState)); + return metadata; + } + + @Override + protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) { + assignmentSnapshot = ConnectProtocol.deserializeAssignment(memberAssignment); + // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment + // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned + // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get + // up to date, try to rejoin again, leaving the group and backing off, etc.). + rejoinRequested = false; + listener.onAssigned(assignmentSnapshot); + } + + @Override + protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) { + log.debug("Performing task assignment"); + + Map<String, ConnectProtocol.WorkerState> allConfigs = new HashMap<>(); + for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet()) + allConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue())); + + long maxOffset = findMaxMemberConfigOffset(allConfigs); + Long leaderOffset = ensureLeaderConfig(maxOffset); + if (leaderOffset == null) + return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.CONFIG_MISMATCH, + leaderId, allConfigs.get(leaderId).url(), maxOffset, + new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>()); + return performTaskAssignment(leaderId, leaderOffset, allConfigs); + } + + private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> allConfigs) { + // The new config offset is the maximum seen by any member. We always perform assignment using this offset, + // even if some members have fallen behind. The config offset used to generate the assignment is included in + // the response so members that have fallen behind will not use the assignment until they have caught up. + Long maxOffset = null; + for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : allConfigs.entrySet()) { + long memberRootOffset = stateEntry.getValue().offset(); + if (maxOffset == null) + maxOffset = memberRootOffset; + else + maxOffset = Math.max(maxOffset, memberRootOffset); + } + + log.debug("Max config offset root: {}, local snapshot config offsets root: {}", + maxOffset, configSnapshot.offset()); + return maxOffset; + } + + private Long ensureLeaderConfig(long maxOffset) { + // If this leader is behind some other members, we can't do assignment + if (configSnapshot.offset() < maxOffset) { + // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here. + // Alternatively, if this node has already passed the maximum reported by any other member of the group, it + // is also safe to use this newer state. + ClusterConfigState updatedSnapshot = configStorage.snapshot(); + if (updatedSnapshot.offset() < maxOffset) { + log.info("Was selected to perform assignments, but do not have latest config found in sync request. " + + "Returning an empty configuration to trigger re-sync."); + return null; + } else { + configSnapshot = updatedSnapshot; + return configSnapshot.offset(); + } + } + + return maxOffset; + } + + private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) { + Map<String, List<String>> connectorAssignments = new HashMap<>(); + Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>(); + + // Perform round-robin task assignment + CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet())); + for (String connectorId : sorted(configSnapshot.connectors())) { + String connectorAssignedTo = memberIt.next(); + log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo); + List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo); + if (memberConnectors == null) { + memberConnectors = new ArrayList<>(); + connectorAssignments.put(connectorAssignedTo, memberConnectors); + } + memberConnectors.add(connectorId); + + for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) { + String taskAssignedTo = memberIt.next(); + log.trace("Assigning task {} to {}", taskId, taskAssignedTo); + List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo); + if (memberTasks == null) { + memberTasks = new ArrayList<>(); + taskAssignments.put(taskAssignedTo, memberTasks); + } + memberTasks.add(taskId); + } + } + + return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR, + leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); + } + + private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, + short error, + String leaderId, + String leaderUrl, + long maxOffset, + Map<String, List<String>> connectorAssignments, + Map<String, List<ConnectorTaskId>> taskAssignments) { + + Map<String, ByteBuffer> groupAssignment = new HashMap<>(); + for (String member : members) { + List<String> connectors = connectorAssignments.get(member); + if (connectors == null) + connectors = Collections.emptyList(); + List<ConnectorTaskId> tasks = taskAssignments.get(member); + if (tasks == null) + tasks = Collections.emptyList(); + ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks); + log.debug("Assignment: {} -> {}", member, assignment); + groupAssignment.put(member, ConnectProtocol.serializeAssignment(assignment)); + } + log.debug("Finished assignment"); + return groupAssignment; + } + + @Override + protected void onJoinPrepare(int generation, String memberId) { + log.debug("Revoking previous assignment {}", assignmentSnapshot); + if (assignmentSnapshot != null && !assignmentSnapshot.failed()) + listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks()); + } + + @Override + public boolean needRejoin() { + return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; + } + + public String memberId() { + return this.memberId; + } + + @Override + public void close() { + super.close(); + } + + private class WorkerCoordinatorMetrics { + public final Metrics metrics; + public final String metricGrpName; + + public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { + this.metrics = metrics; + this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; + + Measurable numConnectors = new Measurable() { + public double measure(MetricConfig config, long now) { + return assignmentSnapshot.connectors().size(); + } + }; + + Measurable numTasks = new Measurable() { + public double measure(MetricConfig config, long now) { + return assignmentSnapshot.tasks().size(); + } + }; + + metrics.addMetric(new MetricName("assigned-connectors", + this.metricGrpName, + "The number of connector instances currently assigned to this consumer", + tags), + numConnectors); + metrics.addMetric(new MetricName("assigned-tasks", + this.metricGrpName, + "The number of tasks currently assigned to this consumer", + tags), + numTasks); + } + } + + private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) { + List<T> res = new ArrayList<>(members); + Collections.sort(res); + return res; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java new file mode 100644 index 0000000..c72e3ef --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class manages the coordination process with brokers for the Connect cluster group membership. It ties together + * the Coordinator, which implements the group member protocol, with all the other pieces needed to drive the connection + * to the group coordinator broker. This isolates all the networking to a single thread managed by this class, with + * higher level operations in response to group membership events being handled by the herder. + */ +public class WorkerGroupMember { + private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class); + + private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final String JMX_PREFIX = "kafka.connect"; + + private final Time time; + private final String clientId; + private final ConsumerNetworkClient client; + private final Metrics metrics; + private final Metadata metadata; + private final long retryBackoffMs; + private final WorkerCoordinator coordinator; + + private boolean stopped = false; + + public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) { + try { + this.time = new SystemTime(); + + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); + String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; + List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); + reporters.add(new JmxReporter(JMX_PREFIX)); + this.metrics = new Metrics(metricConfig, reporters, time); + this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG)); + List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), 0); + String metricGrpPrefix = "connect"; + Map<String, String> metricsTags = new LinkedHashMap<>(); + metricsTags.put("client-id", clientId); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); + NetworkClient netClient = new NetworkClient( + new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), + this.metadata, + clientId, + 100, // a fixed large enough value will suffice + config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG), + config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG), + config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time); + this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); + this.coordinator = new WorkerCoordinator(this.client, + config.getString(DistributedConfig.GROUP_ID_CONFIG), + config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG), + metrics, + metricGrpPrefix, + metricsTags, + this.time, + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), + retryBackoffMs, + restUrl, + configStorage, + listener); + + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); + log.debug("Connect group member created"); + } catch (Throwable t) { + // call close methods if internal objects are already constructed + // this is to prevent resource leak. see KAFKA-2121 + stop(true); + // now propagate the exception + throw new KafkaException("Failed to construct kafka consumer", t); + } + } + + public void stop() { + if (stopped) return; + stop(false); + } + + public void ensureActive() { + coordinator.ensureCoordinatorKnown(); + coordinator.ensureActiveGroup(); + } + + public void poll(long timeout) { + if (timeout < 0) + throw new IllegalArgumentException("Timeout must not be negative"); + + // poll for new data until the timeout expires + long remaining = timeout; + while (remaining >= 0) { + long start = time.milliseconds(); + coordinator.ensureCoordinatorKnown(); + coordinator.ensureActiveGroup(); + client.poll(remaining); + remaining -= time.milliseconds() - start; + } + } + + /** + * Interrupt any running poll() calls, causing a WakeupException to be thrown in the thread invoking that method. + */ + public void wakeup() { + this.client.wakeup(); + } + + /** + * Get the member ID of this worker in the group of workers. + * + * This ID is the unique member ID automatically generated. + * + * @return the member ID + */ + public String memberId() { + return coordinator.memberId(); + } + + public void requestRejoin() { + coordinator.requestRejoin(); + } + + private void stop(boolean swallowException) { + log.trace("Stopping the Connect group member."); + AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); + this.stopped = true; + ClientUtils.closeQuietly(coordinator, "coordinator", firstException); + ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); + ClientUtils.closeQuietly(client, "consumer network client", firstException); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); + if (firstException.get() != null && !swallowException) + throw new KafkaException("Failed to stop the Connect group member", firstException.get()); + else + log.debug("The Connect group member has stopped."); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java new file mode 100644 index 0000000..40f55d2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.connect.util.ConnectorTaskId; + +import java.util.Collection; + +/** + * Listener for rebalance events in the worker group. + */ +public interface WorkerRebalanceListener { + /** + * Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful + * and unsuccessful assignments. + */ + void onAssigned(ConnectProtocol.Assignment assignment); + + /** + * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks. + */ + void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java new file mode 100644 index 0000000..96346ad --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; +import org.apache.kafka.connect.runtime.rest.resources.RootResource; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.Slf4jRequestLog; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.handler.RequestLogHandler; +import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.Map; + +/** + * Embedded server for the REST API that provides the control plane for Kafka Connect workers. + */ +public class RestServer { + private static final Logger log = LoggerFactory.getLogger(RestServer.class); + + private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000; + + private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + + private final WorkerConfig config; + private Herder herder; + private Server jettyServer; + + /** + * Create a REST server for this herder using the specified configs. + */ + public RestServer(WorkerConfig config) { + this.config = config; + + // To make the advertised port available immediately, we need to do some configuration here + String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG); + Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG); + + jettyServer = new Server(); + + ServerConnector connector = new ServerConnector(jettyServer); + if (hostname != null && !hostname.isEmpty()) + connector.setHost(hostname); + connector.setPort(port); + jettyServer.setConnectors(new Connector[]{connector}); + } + + public void start(Herder herder) { + log.info("Starting REST server"); + + this.herder = herder; + + ResourceConfig resourceConfig = new ResourceConfig(); + resourceConfig.register(new JacksonJsonProvider()); + + resourceConfig.register(RootResource.class); + resourceConfig.register(new ConnectorsResource(herder)); + + resourceConfig.register(ConnectExceptionMapper.class); + + ServletContainer servletContainer = new ServletContainer(resourceConfig); + ServletHolder servletHolder = new ServletHolder(servletContainer); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + context.addServlet(servletHolder, "/*"); + + RequestLogHandler requestLogHandler = new RequestLogHandler(); + Slf4jRequestLog requestLog = new Slf4jRequestLog(); + requestLog.setLoggerName(RestServer.class.getCanonicalName()); + requestLog.setLogLatency(true); + requestLogHandler.setRequestLog(requestLog); + + HandlerCollection handlers = new HandlerCollection(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); + + /* Needed for graceful shutdown as per `setStopTimeout` documentation */ + StatisticsHandler statsHandler = new StatisticsHandler(); + statsHandler.setHandler(handlers); + jettyServer.setHandler(statsHandler); + jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); + jettyServer.setStopAtShutdown(true); + + try { + jettyServer.start(); + } catch (Exception e) { + throw new ConnectException("Unable to start REST server", e); + } + + log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); + } + + public void stop() { + try { + jettyServer.stop(); + jettyServer.join(); + } catch (Exception e) { + throw new ConnectException("Unable to stop REST server", e); + } finally { + jettyServer.destroy(); + } + } + + /** + * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty + * server, unless overrides for advertised hostname and/or port are provided via configs. + */ + public String advertisedUrl() { + UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI()); + String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG); + if (advertisedHostname != null && !advertisedHostname.isEmpty()) + builder.host(advertisedHostname); + Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG); + if (advertisedPort != null) + builder.port(advertisedPort); + else + builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG)); + return builder.build().toString(); + } + + + /** + * @param url HTTP connection will be established with this url. + * @param method HTTP method ("GET", "POST", "PUT", etc.) + * @param requestBodyData Object to serialize as JSON and send in the request body. + * @param responseFormat Expected format of the response to the HTTP request. + * @param <T> The type of the deserialized response to the HTTP request. + * @return The deserialized response to the HTTP request, or null if no data is expected. + */ + public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData, + TypeReference<T> responseFormat) { + HttpURLConnection connection = null; + try { + String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData); + log.debug("Sending {} with input {} to {}", method, serializedBody, url); + + connection = (HttpURLConnection) new URL(url).openConnection(); + connection.setRequestMethod(method); + + connection.setRequestProperty("User-Agent", "kafka-connect"); + connection.setRequestProperty("Accept", "application/json"); + + // connection.getResponseCode() implicitly calls getInputStream, so always set to true. + // On the other hand, leaving this out breaks nothing. + connection.setDoInput(true); + + connection.setUseCaches(false); + + if (requestBodyData != null) { + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); + + OutputStream os = connection.getOutputStream(); + os.write(serializedBody.getBytes()); + os.flush(); + os.close(); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) { + return new HttpResponse<>(responseCode, connection.getHeaderFields(), null); + } else if (responseCode >= 400) { + InputStream es = connection.getErrorStream(); + ErrorMessage errorMessage = JSON_SERDE.readValue(es, ErrorMessage.class); + es.close(); + throw new ConnectRestException(responseCode, errorMessage.errorCode(), errorMessage.message()); + } else if (responseCode >= 200 && responseCode < 300) { + InputStream is = connection.getInputStream(); + T result = JSON_SERDE.readValue(is, responseFormat); + is.close(); + return new HttpResponse<>(responseCode, connection.getHeaderFields(), result); + } else { + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + "Unexpected status code when handling forwarded request: " + responseCode); + } + } catch (IOException e) { + log.error("IO error forwarding REST request: ", e); + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); + } finally { + if (connection != null) + connection.disconnect(); + } + } + + public static class HttpResponse<T> { + private int status; + private Map<String, List<String>> headers; + private T body; + + public HttpResponse(int status, Map<String, List<String>> headers, T body) { + this.status = status; + this.headers = headers; + this.body = body; + } + + public int status() { + return status; + } + + public Map<String, List<String>> headers() { + return headers; + } + + public T body() { + return body; + } + } + + public static String urlJoin(String base, String path) { + if (base.endsWith("/") && path.startsWith("/")) + return base + path.substring(1); + else + return base + path; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java new file mode 100644 index 0000000..8daae05 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.connect.util.ConnectorTaskId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class ConnectorInfo { + + private final String name; + private final Map<String, String> config; + private final List<ConnectorTaskId> tasks; + + @JsonCreator + public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config, + @JsonProperty("tasks") List<ConnectorTaskId> tasks) { + this.name = name; + this.config = config; + this.tasks = tasks; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public Map<String, String> config() { + return config; + } + + @JsonProperty + public List<ConnectorTaskId> tasks() { + return tasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectorInfo that = (ConnectorInfo) o; + return Objects.equals(name, that.name) && + Objects.equals(config, that.config) && + Objects.equals(tasks, that.tasks); + } + + @Override + public int hashCode() { + return Objects.hash(name, config, tasks); + } + + + private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) { + List<ConnectorTaskId> jsonTasks = new ArrayList<>(); + for (ConnectorTaskId task : tasks) + jsonTasks.add(task); + return jsonTasks; + } +}