http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java new file mode 100644 index 0000000..6fdefdf --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +/** + * Indicates the operation tried to create an entity that already exists. + */ +public class AlreadyExistsException extends ConnectException { + public AlreadyExistsException(String s) { + super(s); + } + + public AlreadyExistsException(String s, Throwable throwable) { + super(s, throwable); + } + + public AlreadyExistsException(Throwable throwable) { + super(throwable); + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java new file mode 100644 index 0000000..a3bbe91 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +/** + * Indicates that an operation attempted to modify or delete a connector or task that is not present on the worker. + */ +public class NotFoundException extends ConnectException { + public NotFoundException(String s) { + super(s); + } + + public NotFoundException(String s, Throwable throwable) { + super(s, throwable); + } + + public NotFoundException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java new file mode 100644 index 0000000..1b5b07a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +/** + * An exception that indicates the operation can be reattempted. + */ +public class RetriableException extends ConnectException { + public RetriableException(String s) { + super(s); + } + + public RetriableException(String s, Throwable throwable) { + super(s, throwable); + } + + public RetriableException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java new file mode 100644 index 0000000..6611e5d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class ties together all the components of a Kafka Connect process (herder, worker, + * storage, command interface), managing their lifecycle. + */ +@InterfaceStability.Unstable +public class Connect { + private static final Logger log = LoggerFactory.getLogger(Connect.class); + + private final Worker worker; + private final Herder herder; + private final RestServer rest; + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch stopLatch = new CountDownLatch(1); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final ShutdownHook shutdownHook; + + public Connect(Worker worker, Herder herder, RestServer rest) { + log.debug("Kafka Connect instance created"); + this.worker = worker; + this.herder = herder; + this.rest = rest; + shutdownHook = new ShutdownHook(); + } + + public void start() { + log.info("Kafka Connect starting"); + Runtime.getRuntime().addShutdownHook(shutdownHook); + + worker.start(); + herder.start(); + rest.start(herder); + + log.info("Kafka Connect started"); + + startLatch.countDown(); + } + + public void stop() { + boolean wasShuttingDown = shutdown.getAndSet(true); + if (!wasShuttingDown) { + log.info("Kafka Connect stopping"); + + rest.stop(); + herder.stop(); + worker.stop(); + + log.info("Kafka Connect stopped"); + } + + stopLatch.countDown(); + } + + public void awaitStop() { + try { + stopLatch.await(); + } catch (InterruptedException e) { + log.error("Interrupted waiting for Kafka Connect to shutdown"); + } + } + + private class ShutdownHook extends Thread { + @Override + public void run() { + try { + startLatch.await(); + Connect.this.stop(); + } catch (InterruptedException e) { + log.error("Interrupted in shutdown hook while waiting for Kafka Connect startup to finish"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java new file mode 100644 index 0000000..77cfc8d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.HashMap; +import java.util.Map; + +/** + * <p> + * Configuration options for Connectors. These only include Kafka Connect system-level configuration + * options (e.g. Connector class name, timeouts used by Connect to control the connector) but does + * not include Connector-specific options (e.g. database connection settings). + * </p> + * <p> + * Note that some of these options are not required for all connectors. For example TOPICS_CONFIG + * is sink-specific. + * </p> + */ +public class ConnectorConfig extends AbstractConfig { + + public static final String NAME_CONFIG = "name"; + private static final String NAME_DOC = "Globally unique name to use for this connector."; + + public static final String CONNECTOR_CLASS_CONFIG = "connector.class"; + private static final String CONNECTOR_CLASS_DOC = + "Name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector"; + + public static final String TASKS_MAX_CONFIG = "tasks.max"; + private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; + public static final int TASKS_MAX_DEFAULT = 1; + + public static final String TOPICS_CONFIG = "topics"; + private static final String TOPICS_DOC = ""; + public static final String TOPICS_DEFAULT = ""; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) + .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC) + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) + .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); + } + + public ConnectorConfig() { + this(new HashMap<String, String>()); + } + + public ConnectorConfig(Map<String, String> props) { + super(config, props); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java new file mode 100644 index 0000000..fc0689c --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.util.Callback; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * <p> + * The herder interface tracks and manages workers and connectors. It is the main interface for external components + * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class + * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so + * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one + * of the workers. + * </p> + * <p> + * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks, + * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple + * wrappers of the functionality provided by this interface. + * </p> + * <p> + * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, + * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new + * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the + * same process, so the standalone herder implementation can immediately instantiate and start the connector and its + * tasks. + * </p> + */ +public interface Herder { + + void start(); + + void stop(); + + /** + * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered + * from the current configuration. However, note + * + * @returns A list of connector names + * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request + * (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is + * also not the leader + * @throws ConnectException if this node is the leader, but still cannot resolve the + * request (e.g., it is not in sync with other worker's config state) + */ + void connectors(Callback<Collection<String>> callback); + + /** + * Get the definition and status of a connector. + */ + void connectorInfo(String connName, Callback<ConnectorInfo> callback); + + /** + * Get the configuration for a connector. + * @param connName name of the connector + * @param callback callback to invoke with the configuration + */ + void connectorConfig(String connName, Callback<Map<String, String>> callback); + + /** + * Set the configuration for a connector. This supports creation, update, and deletion. + * @param connName name of the connector + * @param config the connectors configuration, or null if deleting the connector + * @param allowReplace if true, allow overwriting previous configs; if false, throw AlreadyExistsException if a connector + * with the same name already exists + * @param callback callback to invoke when the configuration has been written + */ + void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback); + + /** + * Requests reconfiguration of the task. This should only be triggered by + * {@link HerderConnectorContext}. + * + * @param connName name of the connector that should be reconfigured + */ + void requestTaskReconfiguration(String connName); + + /** + * Get the configurations for the current set of tasks of a connector. + * @param connName connector to update + * @param callback callback to invoke upon completion + */ + void taskConfigs(String connName, Callback<List<TaskInfo>> callback); + + /** + * Set the configurations for the tasks of a connector. This should always include all tasks in the connector; if + * there are existing configurations and fewer are provided, this will reduce the number of tasks, and if more are + * provided it will increase the number of tasks. + * @param connName connector to update + * @param configs list of configurations + * @param callback callback to invoke upon completion + */ + void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback); + + + class Created<T> { + private final boolean created; + private final T result; + + public Created(boolean created, T result) { + this.created = created; + this.result = result; + } + + public boolean created() { + return created; + } + + public T result() { + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Created<?> created1 = (Created<?>) o; + return Objects.equals(created, created1.created) && + Objects.equals(result, created1.result); + } + + @Override + public int hashCode() { + return Objects.hash(created, result); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java new file mode 100644 index 0000000..070aa20 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.connector.ConnectorContext; + +/** + * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks + * in a single process. + */ +public class HerderConnectorContext implements ConnectorContext { + + private Herder herder; + private String connectorName; + + public HerderConnectorContext(Herder herder, String connectorName) { + this.herder = herder; + this.connectorName = connectorName; + } + + @Override + public void requestTaskReconfiguration() { + // This is trivial to forward since there is only one herder and it's in memory in this + // process + herder.requestTaskReconfiguration(connectorName); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java new file mode 100644 index 0000000..bee24e7 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * <p> + * Manages offset commit scheduling and execution for SourceTasks. + * </p> + * <p> + * Unlike sink tasks which directly manage their offset commits in the main poll() thread since + * they drive the event loop and control (for all intents and purposes) the timeouts, source + * tasks are at the whim of the connector and cannot be guaranteed to wake up on the necessary + * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and + * ensures they are invoked in a timely fashion. + * </p> + */ +class SourceTaskOffsetCommitter { + private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); + + private Time time; + private WorkerConfig config; + private ScheduledExecutorService commitExecutorService = null; + private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>(); + + SourceTaskOffsetCommitter(Time time, WorkerConfig config) { + this.time = time; + this.config = config; + commitExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + public void close(long timeoutMs) { + commitExecutorService.shutdown(); + try { + if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + log.error("Graceful shutdown of offset commitOffsets thread timed out."); + } + } catch (InterruptedException e) { + // ignore and allow to exit immediately + } + } + + public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) { + synchronized (committers) { + long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() { + @Override + public void run() { + commit(id, workerTask); + } + }, commitIntervalMs, TimeUnit.MILLISECONDS); + committers.put(id, new ScheduledCommitTask(commitFuture)); + } + } + + public void remove(ConnectorTaskId id) { + final ScheduledCommitTask task; + synchronized (committers) { + task = committers.remove(id); + task.cancelled = true; + task.commitFuture.cancel(false); + } + if (task.finishedLatch != null) { + try { + task.finishedLatch.await(); + } catch (InterruptedException e) { + throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e); + } + } + } + + public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) { + final ScheduledCommitTask task; + synchronized (committers) { + task = committers.get(id); + if (task == null || task.cancelled) + return; + task.finishedLatch = new CountDownLatch(1); + } + + try { + log.debug("Committing offsets for {}", workerTask); + boolean success = workerTask.commitOffsets(); + if (!success) { + log.error("Failed to commit offsets for {}", workerTask); + } + } catch (Throwable t) { + // We're very careful about exceptions here since any uncaught exceptions in the commit + // thread would cause the fixed interval schedule on the ExecutorService to stop running + // for that task + log.error("Unhandled exception when committing {}: ", workerTask, t); + } finally { + synchronized (committers) { + task.finishedLatch.countDown(); + if (!task.cancelled) + schedule(id, workerTask); + } + } + } + + private static class ScheduledCommitTask { + ScheduledFuture<?> commitFuture; + boolean cancelled; + CountDownLatch finishedLatch; + + ScheduledCommitTask(ScheduledFuture<?> commitFuture) { + this.commitFuture = commitFuture; + this.cancelled = false; + this.finishedLatch = null; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java new file mode 100644 index 0000000..48cb4d8 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.HashMap; +import java.util.Map; + +/** + * <p> + * Configuration options for Tasks. These only include Kafka Connect system-level configuration + * options. + * </p> + */ +public class TaskConfig extends AbstractConfig { + + public static final String TASK_CLASS_CONFIG = "task.class"; + private static final String TASK_CLASS_DOC = + "Name of the class for this task. Must be a subclass of org.apache.kafka.connect.connector.Task"; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC); + } + + public TaskConfig() { + this(new HashMap<String, String>()); + } + + public TaskConfig(Map<String, ?> props) { + super(config, props); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java new file mode 100644 index 0000000..2e359d6 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.*; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * <p> + * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving + * data to/from Kafka. + * </p> + * <p> + * Since each task has a dedicated thread, this is mainly just a container for them. + * </p> + */ +public class Worker { + private static final Logger log = LoggerFactory.getLogger(Worker.class); + + private Time time; + private WorkerConfig config; + private Converter keyConverter; + private Converter valueConverter; + private Converter internalKeyConverter; + private Converter internalValueConverter; + private OffsetBackingStore offsetBackingStore; + private HashMap<String, Connector> connectors = new HashMap<>(); + private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); + private KafkaProducer<byte[], byte[]> producer; + private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; + + public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) { + this(new SystemTime(), config, offsetBackingStore); + } + + @SuppressWarnings("unchecked") + public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) { + this.time = time; + this.config = config; + this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); + this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); + this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); + this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false); + + this.offsetBackingStore = offsetBackingStore; + this.offsetBackingStore.configure(config.originals()); + } + + public void start() { + log.info("Worker starting"); + + Map<String, Object> producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.putAll(config.unusedConfigs()); + + producer = new KafkaProducer<>(producerProps); + + offsetBackingStore.start(); + sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config); + + log.info("Worker started"); + } + + public void stop() { + log.info("Worker stopping"); + + long started = time.milliseconds(); + long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); + + for (Map.Entry<String, Connector> entry : connectors.entrySet()) { + Connector conn = entry.getValue(); + log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" + + "Worker is stopped.", conn); + try { + conn.stop(); + } catch (ConnectException e) { + log.error("Error while shutting down connector " + conn, e); + } + } + + for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { + WorkerTask task = entry.getValue(); + log.warn("Shutting down task {} uncleanly; herder should have shut down " + + "tasks before the Worker is stopped.", task); + try { + task.stop(); + } catch (ConnectException e) { + log.error("Error while shutting down task " + task, e); + } + } + + for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { + WorkerTask task = entry.getValue(); + log.debug("Waiting for task {} to finish shutting down", task); + if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) + log.error("Graceful shutdown of task {} failed.", task); + task.close(); + } + + long timeoutMs = limit - time.milliseconds(); + sourceTaskOffsetCommitter.close(timeoutMs); + + offsetBackingStore.stop(); + + log.info("Worker stopped"); + } + + public WorkerConfig config() { + return config; + } + + /** + * Add a new connector. + * @param connConfig connector configuration + * @param ctx context for the connector + */ + public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) { + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + log.info("Creating connector {} of type {}", connName, maybeConnClass.getName()); + + Class<? extends Connector> connClass; + try { + connClass = maybeConnClass.asSubclass(Connector.class); + } catch (ClassCastException e) { + throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName()); + } + + if (connectors.containsKey(connName)) + throw new ConnectException("Connector with name " + connName + " already exists"); + + final Connector connector = instantiateConnector(connClass); + log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName()); + connector.initialize(ctx); + try { + connector.start(connConfig.originalsStrings()); + } catch (ConnectException e) { + throw new ConnectException("Connector threw an exception while starting", e); + } + + connectors.put(connName, connector); + + log.info("Finished creating connector {}", connName); + } + + private static Connector instantiateConnector(Class<? extends Connector> connClass) { + try { + return Utils.newInstance(connClass); + } catch (Throwable t) { + // Catches normal exceptions due to instantiation errors as well as any runtime errors that + // may be caused by user code + throw new ConnectException("Failed to create connector instance", t); + } + } + + public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { + log.trace("Reconfiguring connector tasks for {}", connName); + + Connector connector = connectors.get(connName); + if (connector == null) + throw new ConnectException("Connector " + connName + " not found in this worker."); + + List<Map<String, String>> result = new ArrayList<>(); + String taskClassName = connector.taskClass().getName(); + for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { + Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config + taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); + if (sinkTopics != null) + taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); + result.add(taskConfig); + } + return result; + } + + public void stopConnector(String connName) { + log.info("Stopping connector {}", connName); + + Connector connector = connectors.get(connName); + if (connector == null) + throw new ConnectException("Connector " + connName + " not found in this worker."); + + try { + connector.stop(); + } catch (ConnectException e) { + log.error("Error shutting down connector {}: ", connector, e); + } + + connectors.remove(connName); + + log.info("Stopped connector {}", connName); + } + + /** + * Get the IDs of the connectors currently running in this worker. + */ + public Set<String> connectorNames() { + return connectors.keySet(); + } + + /** + * Add a new task. + * @param id Globally unique ID for this task. + * @param taskConfig the parsed task configuration + */ + public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { + log.info("Creating task {}", id); + + if (tasks.containsKey(id)) { + String msg = "Task already exists in this worker; the herder should not have requested " + + "that this : " + id; + log.error(msg); + throw new ConnectException(msg); + } + + Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); + final Task task = instantiateTask(taskClass); + log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); + + // Decide which type of worker task we need based on the type of task. + final WorkerTask workerTask; + if (task instanceof SourceTask) { + SourceTask sourceTask = (SourceTask) task; + OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), + internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), + internalKeyConverter, internalValueConverter); + workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer, + offsetReader, offsetWriter, config, time); + } else if (task instanceof SinkTask) { + workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time); + } else { + log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); + throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); + } + + // Start the task before adding modifying any state, any exceptions are caught higher up the + // call chain and there's no cleanup to do here + workerTask.start(taskConfig.originalsStrings()); + if (task instanceof SourceTask) { + WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; + sourceTaskOffsetCommitter.schedule(id, workerSourceTask); + } + tasks.put(id, workerTask); + } + + private static Task instantiateTask(Class<? extends Task> taskClass) { + try { + return Utils.newInstance(taskClass); + } catch (KafkaException e) { + throw new ConnectException("Task class not found", e); + } + } + + public void stopTask(ConnectorTaskId id) { + log.info("Stopping task {}", id); + + WorkerTask task = getTask(id); + if (task instanceof WorkerSourceTask) + sourceTaskOffsetCommitter.remove(id); + task.stop(); + if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) + log.error("Graceful stop of task {} failed.", task); + task.close(); + tasks.remove(id); + } + + /** + * Get the IDs of the tasks currently running in this worker. + */ + public Set<ConnectorTaskId> taskIds() { + return tasks.keySet(); + } + + private WorkerTask getTask(ConnectorTaskId id) { + WorkerTask task = tasks.get(id); + if (task == null) { + log.error("Task not found: " + id); + throw new ConnectException("Task not found: " + id); + } + return task; + } + + public Converter getInternalKeyConverter() { + return internalKeyConverter; + } + + public Converter getInternalValueConverter() { + return internalValueConverter; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java new file mode 100644 index 0000000..4ecacbb --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Map; + +/** + * Common base class providing configuration for Kafka Connect workers, whether standalone or distributed. + */ +@InterfaceStability.Unstable +public class WorkerConfig extends AbstractConfig { + + public static final String CLUSTER_CONFIG = "cluster"; + private static final String CLUSTER_CONFIG_DOC = + "ID for this cluster, which is used to provide a namespace so multiple Kafka Connect clusters " + + "or instances may co-exist while sharing a single Kafka cluster."; + public static final String CLUSTER_DEFAULT = "connect"; + + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + public static final String BOOTSTRAP_SERVERS_DOC + = "A list of host/port pairs to use for establishing the initial connection to the Kafka " + + "cluster. The client will make use of all servers irrespective of which servers are " + + "specified here for bootstrapping—this list only impacts the initial hosts used " + + "to discover the full set of servers. This list should be in the form " + + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the " + + "initial connection to discover the full cluster membership (which may change " + + "dynamically), this list need not contain the full set of servers (you may want more " + + "than one, though, in case a server is down)."; + public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; + + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; + public static final String KEY_CONVERTER_CLASS_DOC = + "Converter class for key Connect data that implements the <code>Converter</code> interface."; + + public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; + public static final String VALUE_CONVERTER_CLASS_DOC = + "Converter class for value Connect data that implements the <code>Converter</code> interface."; + + public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; + public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = + "Converter class for internal key Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs."; + + public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter"; + public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC = + "Converter class for offset value Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs."; + + public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG + = "task.shutdown.graceful.timeout.ms"; + private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC = + "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time," + + " not per task. All task have shutdown triggered, then they are waited on sequentially."; + private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000"; + + public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms"; + private static final String OFFSET_COMMIT_INTERVAL_MS_DOC + = "Interval at which to try committing offsets for tasks."; + public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L; + + public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms"; + private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC + = "Maximum number of milliseconds to wait for records to flush and partition offset data to be" + + " committed to offset storage before cancelling the process and restoring the offset " + + "data to be committed in a future attempt."; + public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L; + + public static final String REST_HOST_NAME_CONFIG = "rest.host.name"; + private static final String REST_HOST_NAME_DOC + = "Hostname for the REST API. If this is set, it will only bind to this interface."; + + public static final String REST_PORT_CONFIG = "rest.port"; + private static final String REST_PORT_DOC + = "Port for the REST API to listen on."; + public static final int REST_PORT_DEFAULT = 8083; + + public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name"; + private static final String REST_ADVERTISED_HOST_NAME_DOC + = "If this is set, this is the hostname that will be given out to other workers to connect to."; + + public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port"; + private static final String REST_ADVERTISED_PORT_DOC + = "If this is set, this is the port that will be given out to other workers to connect to."; + + /** + * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to + * bootstrap their own ConfigDef. + * @return a ConfigDef with all the common options specified + */ + protected static ConfigDef baseConfigDef() { + return new ConfigDef() + .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) + .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, + Importance.HIGH, BOOTSTRAP_SERVERS_DOC) + .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, KEY_CONVERTER_CLASS_DOC) + .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) + .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC) + .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC) + .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, + TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, + TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) + .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT, + Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC) + .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, + Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC) + .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC) + .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC) + .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC) + .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC); + } + + public WorkerConfig(ConfigDef definition, Map<String, String> props) { + super(definition, props); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java new file mode 100644 index 0000000..a4d4093 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * WorkerTask that uses a SinkTask to export data from Kafka. + */ +class WorkerSinkTask implements WorkerTask { + private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); + + private final ConnectorTaskId id; + private final SinkTask task; + private final WorkerConfig workerConfig; + private final Time time; + private final Converter keyConverter; + private final Converter valueConverter; + private WorkerSinkTaskThread workThread; + private Map<String, String> taskProps; + private KafkaConsumer<byte[], byte[]> consumer; + private WorkerSinkTaskContext context; + private boolean started; + private final List<SinkRecord> messageBatch; + private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets; + private Map<TopicPartition, OffsetAndMetadata> currentOffsets; + private boolean pausedForRedelivery; + + public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, + Converter keyConverter, Converter valueConverter, Time time) { + this.id = id; + this.task = task; + this.workerConfig = workerConfig; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.time = time; + this.started = false; + this.messageBatch = new ArrayList<>(); + this.currentOffsets = new HashMap<>(); + this.pausedForRedelivery = false; + } + + @Override + public void start(Map<String, String> props) { + taskProps = props; + consumer = createConsumer(); + context = new WorkerSinkTaskContext(consumer); + + workThread = createWorkerThread(); + workThread.start(); + } + + @Override + public void stop() { + // Offset commit is handled upon exit in work thread + if (workThread != null) + workThread.startGracefulShutdown(); + consumer.wakeup(); + } + + @Override + public boolean awaitStop(long timeoutMs) { + boolean success = true; + if (workThread != null) { + try { + success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + if (!success) + workThread.forceShutdown(); + } catch (InterruptedException e) { + success = false; + } + } + task.stop(); + return success; + } + + @Override + public void close() { + // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout + // passed in + if (consumer != null) + consumer.close(); + } + + /** + * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the + * SinkTask. + * + * @returns true if successful, false if joining the consumer group was interrupted + */ + public boolean joinConsumerGroupAndStart() { + String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG); + if (topicsStr == null || topicsStr.isEmpty()) + throw new ConnectException("Sink tasks require a list of topics."); + String[] topics = topicsStr.split(","); + log.debug("Task {} subscribing to topics {}", id, topics); + consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); + + // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions + // to work with. Any rewinding will be handled immediately when polling starts. + try { + consumer.poll(0); + } catch (WakeupException e) { + log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); + return false; + } + task.initialize(context); + task.start(taskProps); + log.info("Sink task {} finished initialization and start", this); + started = true; + return true; + } + + /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ + public void poll(long timeoutMs) { + try { + rewind(); + long retryTimeout = context.timeout(); + if (retryTimeout > 0) { + timeoutMs = Math.min(timeoutMs, retryTimeout); + context.timeout(-1L); + } + + log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); + ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); + assert messageBatch.isEmpty() || msgs.isEmpty(); + log.trace("{} polling returned {} messages", id, msgs.count()); + + convertMessages(msgs); + deliverMessages(); + } catch (WakeupException we) { + log.trace("{} consumer woken up", id); + } + } + + /** + * Starts an offset commit by flushing outstanding messages from the task and then starting + * the write commit. This should only be invoked by the WorkerSinkTaskThread. + **/ + public void commitOffsets(boolean sync, final int seqno) { + log.info("{} Committing offsets", this); + + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets); + + try { + task.flush(offsets); + } catch (Throwable t) { + log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); + log.error("Rewinding offsets to last committed offsets"); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { + log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + consumer.seek(entry.getKey(), entry.getValue().offset()); + } + currentOffsets = new HashMap<>(lastCommittedOffsets); + workThread.onCommitCompleted(t, seqno); + return; + } + + if (sync) { + try { + consumer.commitSync(offsets); + lastCommittedOffsets = offsets; + workThread.onCommitCompleted(null, seqno); + } catch (KafkaException e) { + workThread.onCommitCompleted(e, seqno); + } + } else { + OffsetCommitCallback cb = new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { + lastCommittedOffsets = offsets; + workThread.onCommitCompleted(error, seqno); + } + }; + consumer.commitAsync(offsets, cb); + } + } + + public Time time() { + return time; + } + + public WorkerConfig workerConfig() { + return workerConfig; + } + + private KafkaConsumer<byte[], byte[]> createConsumer() { + // Include any unknown worker configs so consumer configs can be set globally on the worker + // and through to the task + Map<String, Object> props = workerConfig.unusedConfigs(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + KafkaConsumer<byte[], byte[]> newConsumer; + try { + newConsumer = new KafkaConsumer<>(props); + } catch (Throwable t) { + throw new ConnectException("Failed to create consumer", t); + } + + return newConsumer; + } + + private WorkerSinkTaskThread createWorkerThread() { + return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); + } + + private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { + for (ConsumerRecord<byte[], byte[]> msg : msgs) { + log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); + SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); + SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); + messageBatch.add( + new SinkRecord(msg.topic(), msg.partition(), + keyAndSchema.schema(), keyAndSchema.value(), + valueAndSchema.schema(), valueAndSchema.value(), + msg.offset()) + ); + } + } + + private void deliverMessages() { + // Finally, deliver this batch to the sink + try { + // Since we reuse the messageBatch buffer, ensure we give the task its own copy + task.put(new ArrayList<>(messageBatch)); + for (SinkRecord record : messageBatch) + currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()), + new OffsetAndMetadata(record.kafkaOffset() + 1)); + messageBatch.clear(); + // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that + // the task had not explicitly paused + if (pausedForRedelivery) { + for (TopicPartition tp : consumer.assignment()) + if (!context.pausedPartitions().contains(tp)) + consumer.resume(tp); + pausedForRedelivery = false; + } + } catch (RetriableException e) { + log.error("RetriableException from SinkTask {}: {}", id, e); + // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data, + // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc. + pausedForRedelivery = true; + for (TopicPartition tp : consumer.assignment()) + consumer.pause(tp); + // Let this exit normally, the batch will be reprocessed on the next loop. + } catch (Throwable t) { + log.error("Task {} threw an uncaught and unrecoverable exception", id); + log.error("Task is being killed and will not recover until manually restarted:", t); + throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception."); + } + } + + private void rewind() { + Map<TopicPartition, Long> offsets = context.offsets(); + if (offsets.isEmpty()) { + return; + } + for (TopicPartition tp: offsets.keySet()) { + Long offset = offsets.get(tp); + if (offset != null) { + log.trace("Rewind {} to offset {}.", tp, offset); + consumer.seek(tp, offset); + lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset)); + currentOffsets.put(tp, new OffsetAndMetadata(offset)); + } + } + context.clearOffsets(); + } + + private class HandleRebalance implements ConsumerRebalanceListener { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + lastCommittedOffsets = new HashMap<>(); + currentOffsets = new HashMap<>(); + for (TopicPartition tp : partitions) { + long pos = consumer.position(tp); + lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); + currentOffsets.put(tp, new OffsetAndMetadata(pos)); + log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); + } + + // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make + // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed. + // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own. + if (pausedForRedelivery) { + pausedForRedelivery = false; + Set<TopicPartition> assigned = new HashSet<>(partitions); + Set<TopicPartition> taskPaused = context.pausedPartitions(); + + for (TopicPartition tp : partitions) { + if (!taskPaused.contains(tp)) + consumer.resume(tp); + } + + Iterator<TopicPartition> tpIter = taskPaused.iterator(); + while (tpIter.hasNext()) { + TopicPartition tp = tpIter.next(); + if (assigned.contains(tp)) + tpIter.remove(); + } + } + + // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon + // task start. Since this callback gets invoked during that initial setup before we've started the task, we + // need to guard against invoking the user's callback method during that period. + if (started) + task.onPartitionsAssigned(partitions); + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); + // Make sure we don't have any leftover data since offsets will be reset to committed positions + messageBatch.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java new file mode 100644 index 0000000..06f4838 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.IllegalWorkerStateException; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class WorkerSinkTaskContext implements SinkTaskContext { + private Map<TopicPartition, Long> offsets; + private long timeoutMs; + private KafkaConsumer<byte[], byte[]> consumer; + private final Set<TopicPartition> pausedPartitions; + + public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) { + this.offsets = new HashMap<>(); + this.timeoutMs = -1L; + this.consumer = consumer; + this.pausedPartitions = new HashSet<>(); + } + + @Override + public void offset(Map<TopicPartition, Long> offsets) { + this.offsets.putAll(offsets); + } + + @Override + public void offset(TopicPartition tp, long offset) { + offsets.put(tp, offset); + } + + public void clearOffsets() { + offsets.clear(); + } + + /** + * Get offsets that the SinkTask has submitted to be reset. Used by the Kafka Connect framework. + * @return the map of offsets + */ + public Map<TopicPartition, Long> offsets() { + return offsets; + } + + @Override + public void timeout(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + /** + * Get the timeout in milliseconds set by SinkTasks. Used by the Kafka Connect framework. + * @return the backoff timeout in milliseconds. + */ + public long timeout() { + return timeoutMs; + } + + @Override + public Set<TopicPartition> assignment() { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized"); + } + return consumer.assignment(); + } + + @Override + public void pause(TopicPartition... partitions) { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized"); + } + try { + for (TopicPartition partition : partitions) + pausedPartitions.add(partition); + consumer.pause(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); + } + } + + @Override + public void resume(TopicPartition... partitions) { + if (consumer == null) { + throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); + } + try { + for (TopicPartition partition : partitions) + pausedPartitions.remove(partition); + consumer.resume(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); + } + } + + public Set<TopicPartition> pausedPartitions() { + return pausedPartitions; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java new file mode 100644 index 0000000..e776f08 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to + * simplify testing. + */ +class WorkerSinkTaskThread extends ShutdownableThread { + private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); + + private final WorkerSinkTask task; + private long nextCommit; + private boolean committing; + private int commitSeqno; + private long commitStarted; + private int commitFailures; + + public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, + WorkerConfig workerConfig) { + super(name); + this.task = task; + this.nextCommit = time.milliseconds() + + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + this.committing = false; + this.commitSeqno = 0; + this.commitStarted = -1; + this.commitFailures = 0; + } + + @Override + public void execute() { + // Try to join and start. If we're interrupted before this completes, bail. + if (!task.joinConsumerGroupAndStart()) + return; + + while (getRunning()) { + iteration(); + } + + // Make sure any uncommitted data has committed + task.commitOffsets(true, -1); + } + + public void iteration() { + long now = task.time().milliseconds(); + + // Maybe commit + if (!committing && now >= nextCommit) { + synchronized (this) { + committing = true; + commitSeqno += 1; + commitStarted = now; + } + task.commitOffsets(false, commitSeqno); + nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + } + + // Check for timed out commits + long commitTimeout = commitStarted + task.workerConfig().getLong( + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + if (committing && now >= commitTimeout) { + log.warn("Commit of {} offsets timed out", this); + commitFailures++; + committing = false; + } + + // And process messages + long timeoutMs = Math.max(nextCommit - now, 0); + task.poll(timeoutMs); + } + + public void onCommitCompleted(Throwable error, long seqno) { + synchronized (this) { + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + commitFailures++; + } else { + log.debug("Finished {} offset commit successfully in {} ms", + this, task.time().milliseconds() - commitStarted); + commitFailures = 0; + } + committing = false; + } + } + } + + public int commitFailures() { + return commitFailures; + } +}