http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java deleted file mode 100644 index b09cb53..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -/** - * Indicates the operation tried to create an entity that already exists. - */ -public class AlreadyExistsException extends CopycatException { - public AlreadyExistsException(String s) { - super(s); - } - - public AlreadyExistsException(String s, Throwable throwable) { - super(s, throwable); - } - - public AlreadyExistsException(Throwable throwable) { - super(throwable); - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java deleted file mode 100644 index a8e13a9..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -/** - * Indicates that an operation attempted to modify or delete a connector or task that is not present on the worker. - */ -public class NotFoundException extends CopycatException { - public NotFoundException(String s) { - super(s); - } - - public NotFoundException(String s, Throwable throwable) { - super(s, throwable); - } - - public NotFoundException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java deleted file mode 100644 index 75821aa..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -/** - * An exception that indicates the operation can be reattempted. - */ -public class RetriableException extends CopycatException { - public RetriableException(String s) { - super(s); - } - - public RetriableException(String s, Throwable throwable) { - super(s, throwable); - } - - public RetriableException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java deleted file mode 100644 index 2242299..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; - -import java.util.HashMap; -import java.util.Map; - -/** - * <p> - * Configuration options for Connectors. These only include Copycat system-level configuration - * options (e.g. Connector class name, timeouts used by Copycat to control the connector) but does - * not include Connector-specific options (e.g. database connection settings). - * </p> - * <p> - * Note that some of these options are not required for all connectors. For example TOPICS_CONFIG - * is sink-specific. - * </p> - */ -public class ConnectorConfig extends AbstractConfig { - - public static final String NAME_CONFIG = "name"; - private static final String NAME_DOC = "Globally unique name to use for this connector."; - - public static final String CONNECTOR_CLASS_CONFIG = "connector.class"; - private static final String CONNECTOR_CLASS_DOC = - "Name of the class for this connector. Must be a subclass of org.apache.kafka.copycat.connector" - + ".Connector"; - - public static final String TASKS_MAX_CONFIG = "tasks.max"; - private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; - public static final int TASKS_MAX_DEFAULT = 1; - - public static final String TOPICS_CONFIG = "topics"; - private static final String TOPICS_DOC = ""; - public static final String TOPICS_DEFAULT = ""; - - private static ConfigDef config; - - static { - config = new ConfigDef() - .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) - .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC) - .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) - .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); - } - - public ConnectorConfig() { - this(new HashMap<String, String>()); - } - - public ConnectorConfig(Map<String, String> props) { - super(config, props); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java deleted file mode 100644 index 81f0b16..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.runtime.rest.RestServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class ties together all the components of a Copycat process (herder, worker, - * storage, command interface), managing their lifecycle. - */ -@InterfaceStability.Unstable -public class Copycat { - private static final Logger log = LoggerFactory.getLogger(Copycat.class); - - private final Worker worker; - private final Herder herder; - private final RestServer rest; - private final CountDownLatch startLatch = new CountDownLatch(1); - private final CountDownLatch stopLatch = new CountDownLatch(1); - private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final ShutdownHook shutdownHook; - - public Copycat(Worker worker, Herder herder, RestServer rest) { - log.debug("Copycat created"); - this.worker = worker; - this.herder = herder; - this.rest = rest; - shutdownHook = new ShutdownHook(); - } - - public void start() { - log.info("Copycat starting"); - Runtime.getRuntime().addShutdownHook(shutdownHook); - - worker.start(); - herder.start(); - rest.start(herder); - - log.info("Copycat started"); - - startLatch.countDown(); - } - - public void stop() { - boolean wasShuttingDown = shutdown.getAndSet(true); - if (!wasShuttingDown) { - log.info("Copycat stopping"); - - rest.stop(); - herder.stop(); - worker.stop(); - - log.info("Copycat stopped"); - } - - stopLatch.countDown(); - } - - public void awaitStop() { - try { - stopLatch.await(); - } catch (InterruptedException e) { - log.error("Interrupted waiting for Copycat to shutdown"); - } - } - - private class ShutdownHook extends Thread { - @Override - public void run() { - try { - startLatch.await(); - Copycat.this.stop(); - } catch (InterruptedException e) { - log.error("Interrupted in shutdown hook while waiting for copycat startup to finish"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java deleted file mode 100644 index 0b03c9a..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; -import org.apache.kafka.copycat.util.Callback; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * <p> - * The herder interface tracks and manages workers and connectors. It is the main interface for external components - * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class - * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so - * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one - * of the workers. - * </p> - * <p> - * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks, - * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple - * wrappers of the functionality provided by this interface. - * </p> - * <p> - * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, - * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new - * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the - * same process, so the standalone herder implementation can immediately instantiate and start the connector and its - * tasks. - * </p> - */ -public interface Herder { - - void start(); - - void stop(); - - /** - * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered - * from the current configuration. However, note - * - * @returns A list of connector names - * @throws org.apache.kafka.copycat.runtime.distributed.NotLeaderException if this node can not resolve the request - * (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is - * also not the leader - * @throws org.apache.kafka.copycat.errors.CopycatException if this node is the leader, but still cannot resolve the - * request (e.g., it is not in sync with other worker's config state) - */ - void connectors(Callback<Collection<String>> callback); - - /** - * Get the definition and status of a connector. - */ - void connectorInfo(String connName, Callback<ConnectorInfo> callback); - - /** - * Get the configuration for a connector. - * @param connName name of the connector - * @param callback callback to invoke with the configuration - */ - void connectorConfig(String connName, Callback<Map<String, String>> callback); - - /** - * Set the configuration for a connector. This supports creation, update, and deletion. - * @param connName name of the connector - * @param config the connectors configuration, or null if deleting the connector - * @param allowReplace if true, allow overwriting previous configs; if false, throw AlreadyExistsException if a connector - * with the same name already exists - * @param callback callback to invoke when the configuration has been written - */ - void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback); - - /** - * Requests reconfiguration of the task. This should only be triggered by - * {@link HerderConnectorContext}. - * - * @param connName name of the connector that should be reconfigured - */ - void requestTaskReconfiguration(String connName); - - /** - * Get the configurations for the current set of tasks of a connector. - * @param connName connector to update - * @param callback callback to invoke upon completion - */ - void taskConfigs(String connName, Callback<List<TaskInfo>> callback); - - /** - * Set the configurations for the tasks of a connector. This should always include all tasks in the connector; if - * there are existing configurations and fewer are provided, this will reduce the number of tasks, and if more are - * provided it will increase the number of tasks. - * @param connName connector to update - * @param configs list of configurations - * @param callback callback to invoke upon completion - */ - void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback); - - - class Created<T> { - private final boolean created; - private final T result; - - public Created(boolean created, T result) { - this.created = created; - this.result = result; - } - - public boolean created() { - return created; - } - - public T result() { - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Created<?> created1 = (Created<?>) o; - return Objects.equals(created, created1.created) && - Objects.equals(result, created1.result); - } - - @Override - public int hashCode() { - return Objects.hash(created, result); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java deleted file mode 100644 index 7a64bd5..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.copycat.connector.ConnectorContext; - -/** - * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks - * in a single process. - */ -public class HerderConnectorContext implements ConnectorContext { - - private Herder herder; - private String connectorName; - - public HerderConnectorContext(Herder herder, String connectorName) { - this.herder = herder; - this.connectorName = connectorName; - } - - @Override - public void requestTaskReconfiguration() { - // This is trivial to forward since there is only one herder and it's in memory in this - // process - herder.requestTaskReconfiguration(connectorName); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java deleted file mode 100644 index 6bb51b9..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * <p> - * Manages offset commit scheduling and execution for SourceTasks. - * </p> - * <p> - * Unlike sink tasks which directly manage their offset commits in the main poll() thread since - * they drive the event loop and control (for all intents and purposes) the timeouts, source - * tasks are at the whim of the connector and cannot be guaranteed to wake up on the necessary - * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and - * ensures they are invoked in a timely fashion. - * </p> - */ -class SourceTaskOffsetCommitter { - private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); - - private Time time; - private WorkerConfig config; - private ScheduledExecutorService commitExecutorService = null; - private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>(); - - SourceTaskOffsetCommitter(Time time, WorkerConfig config) { - this.time = time; - this.config = config; - commitExecutorService = Executors.newSingleThreadScheduledExecutor(); - } - - public void close(long timeoutMs) { - commitExecutorService.shutdown(); - try { - if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { - log.error("Graceful shutdown of offset commitOffsets thread timed out."); - } - } catch (InterruptedException e) { - // ignore and allow to exit immediately - } - } - - public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) { - synchronized (committers) { - long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() { - @Override - public void run() { - commit(id, workerTask); - } - }, commitIntervalMs, TimeUnit.MILLISECONDS); - committers.put(id, new ScheduledCommitTask(commitFuture)); - } - } - - public void remove(ConnectorTaskId id) { - final ScheduledCommitTask task; - synchronized (committers) { - task = committers.remove(id); - task.cancelled = true; - task.commitFuture.cancel(false); - } - if (task.finishedLatch != null) { - try { - task.finishedLatch.await(); - } catch (InterruptedException e) { - throw new CopycatException("Unexpected interruption in SourceTaskOffsetCommitter.", e); - } - } - } - - public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) { - final ScheduledCommitTask task; - synchronized (committers) { - task = committers.get(id); - if (task == null || task.cancelled) - return; - task.finishedLatch = new CountDownLatch(1); - } - - try { - log.debug("Committing offsets for {}", workerTask); - boolean success = workerTask.commitOffsets(); - if (!success) { - log.error("Failed to commit offsets for {}", workerTask); - } - } catch (Throwable t) { - // We're very careful about exceptions here since any uncaught exceptions in the commit - // thread would cause the fixed interval schedule on the ExecutorService to stop running - // for that task - log.error("Unhandled exception when committing {}: ", workerTask, t); - } finally { - synchronized (committers) { - task.finishedLatch.countDown(); - if (!task.cancelled) - schedule(id, workerTask); - } - } - } - - private static class ScheduledCommitTask { - ScheduledFuture<?> commitFuture; - boolean cancelled; - CountDownLatch finishedLatch; - - ScheduledCommitTask(ScheduledFuture<?> commitFuture) { - this.commitFuture = commitFuture; - this.cancelled = false; - this.finishedLatch = null; - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java deleted file mode 100644 index be97879..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; - -import java.util.HashMap; -import java.util.Map; - -/** - * <p> - * Configuration options for Tasks. These only include Copycat system-level configuration - * options. - * </p> - */ -public class TaskConfig extends AbstractConfig { - - public static final String TASK_CLASS_CONFIG = "task.class"; - private static final String TASK_CLASS_DOC = - "Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task"; - - private static ConfigDef config; - - static { - config = new ConfigDef() - .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC); - } - - public TaskConfig() { - this(new HashMap<String, String>()); - } - - public TaskConfig(Map<String, ?> props) { - super(config, props); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java deleted file mode 100644 index 91fa175..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ /dev/null @@ -1,331 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.connector.Connector; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.storage.*; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * <p> - * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving - * data to/from Kafka. - * </p> - * <p> - * Since each task has a dedicated thread, this is mainly just a container for them. - * </p> - */ -public class Worker { - private static final Logger log = LoggerFactory.getLogger(Worker.class); - - private Time time; - private WorkerConfig config; - private Converter keyConverter; - private Converter valueConverter; - private Converter internalKeyConverter; - private Converter internalValueConverter; - private OffsetBackingStore offsetBackingStore; - private HashMap<String, Connector> connectors = new HashMap<>(); - private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); - private KafkaProducer<byte[], byte[]> producer; - private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; - - public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) { - this(new SystemTime(), config, offsetBackingStore); - } - - @SuppressWarnings("unchecked") - public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) { - this.time = time; - this.config = config; - this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); - this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); - this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); - this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); - this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); - this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); - this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); - this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false); - - this.offsetBackingStore = offsetBackingStore; - this.offsetBackingStore.configure(config.originals()); - } - - public void start() { - log.info("Worker starting"); - - Map<String, Object> producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerProps.putAll(config.unusedConfigs()); - - producer = new KafkaProducer<>(producerProps); - - offsetBackingStore.start(); - sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config); - - log.info("Worker started"); - } - - public void stop() { - log.info("Worker stopping"); - - long started = time.milliseconds(); - long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); - - for (Map.Entry<String, Connector> entry : connectors.entrySet()) { - Connector conn = entry.getValue(); - log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" + - "Worker is stopped.", conn); - try { - conn.stop(); - } catch (CopycatException e) { - log.error("Error while shutting down connector " + conn, e); - } - } - - for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { - WorkerTask task = entry.getValue(); - log.warn("Shutting down task {} uncleanly; herder should have shut down " - + "tasks before the Worker is stopped.", task); - try { - task.stop(); - } catch (CopycatException e) { - log.error("Error while shutting down task " + task, e); - } - } - - for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { - WorkerTask task = entry.getValue(); - log.debug("Waiting for task {} to finish shutting down", task); - if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) - log.error("Graceful shutdown of task {} failed.", task); - task.close(); - } - - long timeoutMs = limit - time.milliseconds(); - sourceTaskOffsetCommitter.close(timeoutMs); - - offsetBackingStore.stop(); - - log.info("Worker stopped"); - } - - public WorkerConfig config() { - return config; - } - - /** - * Add a new connector. - * @param connConfig connector configuration - * @param ctx context for the connector - */ - public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) { - String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); - Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - log.info("Creating connector {} of type {}", connName, maybeConnClass.getName()); - - Class<? extends Connector> connClass; - try { - connClass = maybeConnClass.asSubclass(Connector.class); - } catch (ClassCastException e) { - throw new CopycatException("Specified class is not a subclass of Connector: " + maybeConnClass.getName()); - } - - if (connectors.containsKey(connName)) - throw new CopycatException("Connector with name " + connName + " already exists"); - - final Connector connector = instantiateConnector(connClass); - log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName()); - connector.initialize(ctx); - try { - connector.start(connConfig.originalsStrings()); - } catch (CopycatException e) { - throw new CopycatException("Connector threw an exception while starting", e); - } - - connectors.put(connName, connector); - - log.info("Finished creating connector {}", connName); - } - - private static Connector instantiateConnector(Class<? extends Connector> connClass) { - try { - return Utils.newInstance(connClass); - } catch (Throwable t) { - // Catches normal exceptions due to instantiation errors as well as any runtime errors that - // may be caused by user code - throw new CopycatException("Failed to create connector instance", t); - } - } - - public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { - log.trace("Reconfiguring connector tasks for {}", connName); - - Connector connector = connectors.get(connName); - if (connector == null) - throw new CopycatException("Connector " + connName + " not found in this worker."); - - List<Map<String, String>> result = new ArrayList<>(); - String taskClassName = connector.taskClass().getName(); - for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { - Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config - taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); - if (sinkTopics != null) - taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); - result.add(taskConfig); - } - return result; - } - - public void stopConnector(String connName) { - log.info("Stopping connector {}", connName); - - Connector connector = connectors.get(connName); - if (connector == null) - throw new CopycatException("Connector " + connName + " not found in this worker."); - - try { - connector.stop(); - } catch (CopycatException e) { - log.error("Error shutting down connector {}: ", connector, e); - } - - connectors.remove(connName); - - log.info("Stopped connector {}", connName); - } - - /** - * Get the IDs of the connectors currently running in this worker. - */ - public Set<String> connectorNames() { - return connectors.keySet(); - } - - /** - * Add a new task. - * @param id Globally unique ID for this task. - * @param taskConfig the parsed task configuration - */ - public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { - log.info("Creating task {}", id); - - if (tasks.containsKey(id)) { - String msg = "Task already exists in this worker; the herder should not have requested " - + "that this : " + id; - log.error(msg); - throw new CopycatException(msg); - } - - Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); - final Task task = instantiateTask(taskClass); - log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); - - // Decide which type of worker task we need based on the type of task. - final WorkerTask workerTask; - if (task instanceof SourceTask) { - SourceTask sourceTask = (SourceTask) task; - OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), - internalKeyConverter, internalValueConverter); - OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), - internalKeyConverter, internalValueConverter); - workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer, - offsetReader, offsetWriter, config, time); - } else if (task instanceof SinkTask) { - workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time); - } else { - log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); - throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask"); - } - - // Start the task before adding modifying any state, any exceptions are caught higher up the - // call chain and there's no cleanup to do here - workerTask.start(taskConfig.originalsStrings()); - if (task instanceof SourceTask) { - WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; - sourceTaskOffsetCommitter.schedule(id, workerSourceTask); - } - tasks.put(id, workerTask); - } - - private static Task instantiateTask(Class<? extends Task> taskClass) { - try { - return Utils.newInstance(taskClass); - } catch (KafkaException e) { - throw new CopycatException("Task class not found", e); - } - } - - public void stopTask(ConnectorTaskId id) { - log.info("Stopping task {}", id); - - WorkerTask task = getTask(id); - if (task instanceof WorkerSourceTask) - sourceTaskOffsetCommitter.remove(id); - task.stop(); - if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) - log.error("Graceful stop of task {} failed.", task); - task.close(); - tasks.remove(id); - } - - /** - * Get the IDs of the tasks currently running in this worker. - */ - public Set<ConnectorTaskId> taskIds() { - return tasks.keySet(); - } - - private WorkerTask getTask(ConnectorTaskId id) { - WorkerTask task = tasks.get(id); - if (task == null) { - log.error("Task not found: " + id); - throw new CopycatException("Task not found: " + id); - } - return task; - } - - public Converter getInternalKeyConverter() { - return internalKeyConverter; - } - - public Converter getInternalValueConverter() { - return internalValueConverter; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java deleted file mode 100644 index b962d54..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; - -import java.util.Map; - -/** - * Common base class providing configuration for Copycat workers, whether standalone or distributed. - */ -@InterfaceStability.Unstable -public class WorkerConfig extends AbstractConfig { - - public static final String CLUSTER_CONFIG = "cluster"; - private static final String CLUSTER_CONFIG_DOC = - "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters " - + "or instances may co-exist while sharing a single Kafka cluster."; - public static final String CLUSTER_DEFAULT = "copycat"; - - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; - public static final String BOOTSTRAP_SERVERS_DOC - = "A list of host/port pairs to use for establishing the initial connection to the Kafka " - + "cluster. The client will make use of all servers irrespective of which servers are " - + "specified here for bootstrapping—this list only impacts the initial hosts used " - + "to discover the full set of servers. This list should be in the form " - + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the " - + "initial connection to discover the full cluster membership (which may change " - + "dynamically), this list need not contain the full set of servers (you may want more " - + "than one, though, in case a server is down)."; - public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; - - public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; - public static final String KEY_CONVERTER_CLASS_DOC = - "Converter class for key Copycat data that implements the <code>Converter</code> interface."; - - public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; - public static final String VALUE_CONVERTER_CLASS_DOC = - "Converter class for value Copycat data that implements the <code>Converter</code> interface."; - - public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; - public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = - "Converter class for internal key Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs."; - - public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter"; - public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC = - "Converter class for offset value Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs."; - - public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG - = "task.shutdown.graceful.timeout.ms"; - private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC = - "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time," - + " not per task. All task have shutdown triggered, then they are waited on sequentially."; - private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000"; - - public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms"; - private static final String OFFSET_COMMIT_INTERVAL_MS_DOC - = "Interval at which to try committing offsets for tasks."; - public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L; - - public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms"; - private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC - = "Maximum number of milliseconds to wait for records to flush and partition offset data to be" - + " committed to offset storage before cancelling the process and restoring the offset " - + "data to be committed in a future attempt."; - public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L; - - public static final String REST_HOST_NAME_CONFIG = "rest.host.name"; - private static final String REST_HOST_NAME_DOC - = "Hostname for the REST API. If this is set, it will only bind to this interface."; - - public static final String REST_PORT_CONFIG = "rest.port"; - private static final String REST_PORT_DOC - = "Port for the REST API to listen on."; - public static final int REST_PORT_DEFAULT = 8083; - - public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name"; - private static final String REST_ADVERTISED_HOST_NAME_DOC - = "If this is set, this is the hostname that will be given out to other workers to connect to."; - - public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port"; - private static final String REST_ADVERTISED_PORT_DOC - = "If this is set, this is the port that will be given out to other workers to connect to."; - - /** - * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to - * bootstrap their own ConfigDef. - * @return a ConfigDef with all the common options specified - */ - protected static ConfigDef baseConfigDef() { - return new ConfigDef() - .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) - .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, - Importance.HIGH, BOOTSTRAP_SERVERS_DOC) - .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, KEY_CONVERTER_CLASS_DOC) - .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) - .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC) - .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC) - .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, - TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, - TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) - .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT, - Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC) - .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, - Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC) - .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC) - .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC) - .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC) - .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC); - } - - public WorkerConfig(ConfigDef definition, Map<String, String> props) { - super(definition, props); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java deleted file mode 100644 index ad6d872..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.RetriableException; -import org.apache.kafka.copycat.sink.SinkRecord; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.storage.Converter; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * WorkerTask that uses a SinkTask to export data from Kafka. - */ -class WorkerSinkTask implements WorkerTask { - private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - - private final ConnectorTaskId id; - private final SinkTask task; - private final WorkerConfig workerConfig; - private final Time time; - private final Converter keyConverter; - private final Converter valueConverter; - private WorkerSinkTaskThread workThread; - private Map<String, String> taskProps; - private KafkaConsumer<byte[], byte[]> consumer; - private WorkerSinkTaskContext context; - private boolean started; - private final List<SinkRecord> messageBatch; - private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets; - private Map<TopicPartition, OffsetAndMetadata> currentOffsets; - private boolean pausedForRedelivery; - - public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, - Converter keyConverter, Converter valueConverter, Time time) { - this.id = id; - this.task = task; - this.workerConfig = workerConfig; - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; - this.time = time; - this.started = false; - this.messageBatch = new ArrayList<>(); - this.currentOffsets = new HashMap<>(); - this.pausedForRedelivery = false; - } - - @Override - public void start(Map<String, String> props) { - taskProps = props; - consumer = createConsumer(); - context = new WorkerSinkTaskContext(consumer); - - workThread = createWorkerThread(); - workThread.start(); - } - - @Override - public void stop() { - // Offset commit is handled upon exit in work thread - if (workThread != null) - workThread.startGracefulShutdown(); - consumer.wakeup(); - } - - @Override - public boolean awaitStop(long timeoutMs) { - boolean success = true; - if (workThread != null) { - try { - success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); - if (!success) - workThread.forceShutdown(); - } catch (InterruptedException e) { - success = false; - } - } - task.stop(); - return success; - } - - @Override - public void close() { - // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout - // passed in - if (consumer != null) - consumer.close(); - } - - /** - * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the - * SinkTask. - * - * @returns true if successful, false if joining the consumer group was interrupted - */ - public boolean joinConsumerGroupAndStart() { - String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG); - if (topicsStr == null || topicsStr.isEmpty()) - throw new CopycatException("Sink tasks require a list of topics."); - String[] topics = topicsStr.split(","); - log.debug("Task {} subscribing to topics {}", id, topics); - consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - - // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions - // to work with. Any rewinding will be handled immediately when polling starts. - try { - consumer.poll(0); - } catch (WakeupException e) { - log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); - return false; - } - task.initialize(context); - task.start(taskProps); - log.info("Sink task {} finished initialization and start", this); - started = true; - return true; - } - - /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ - public void poll(long timeoutMs) { - try { - rewind(); - long retryTimeout = context.timeout(); - if (retryTimeout > 0) { - timeoutMs = Math.min(timeoutMs, retryTimeout); - context.timeout(-1L); - } - - log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); - ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); - assert messageBatch.isEmpty() || msgs.isEmpty(); - log.trace("{} polling returned {} messages", id, msgs.count()); - - convertMessages(msgs); - deliverMessages(); - } catch (WakeupException we) { - log.trace("{} consumer woken up", id); - } - } - - /** - * Starts an offset commit by flushing outstanding messages from the task and then starting - * the write commit. This should only be invoked by the WorkerSinkTaskThread. - **/ - public void commitOffsets(boolean sync, final int seqno) { - log.info("{} Committing offsets", this); - - final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets); - - try { - task.flush(offsets); - } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); - log.error("Rewinding offsets to last committed offsets"); - for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { - log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); - consumer.seek(entry.getKey(), entry.getValue().offset()); - } - currentOffsets = new HashMap<>(lastCommittedOffsets); - workThread.onCommitCompleted(t, seqno); - return; - } - - if (sync) { - try { - consumer.commitSync(offsets); - lastCommittedOffsets = offsets; - workThread.onCommitCompleted(null, seqno); - } catch (KafkaException e) { - workThread.onCommitCompleted(e, seqno); - } - } else { - OffsetCommitCallback cb = new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { - lastCommittedOffsets = offsets; - workThread.onCommitCompleted(error, seqno); - } - }; - consumer.commitAsync(offsets, cb); - } - } - - public Time time() { - return time; - } - - public WorkerConfig workerConfig() { - return workerConfig; - } - - private KafkaConsumer<byte[], byte[]> createConsumer() { - // Include any unknown worker configs so consumer configs can be set globally on the worker - // and through to the task - Map<String, Object> props = workerConfig.unusedConfigs(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector()); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - KafkaConsumer<byte[], byte[]> newConsumer; - try { - newConsumer = new KafkaConsumer<>(props); - } catch (Throwable t) { - throw new CopycatException("Failed to create consumer", t); - } - - return newConsumer; - } - - private WorkerSinkTaskThread createWorkerThread() { - return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); - } - - private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { - for (ConsumerRecord<byte[], byte[]> msg : msgs) { - log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); - SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key()); - SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value()); - messageBatch.add( - new SinkRecord(msg.topic(), msg.partition(), - keyAndSchema.schema(), keyAndSchema.value(), - valueAndSchema.schema(), valueAndSchema.value(), - msg.offset()) - ); - } - } - - private void deliverMessages() { - // Finally, deliver this batch to the sink - try { - // Since we reuse the messageBatch buffer, ensure we give the task its own copy - task.put(new ArrayList<>(messageBatch)); - for (SinkRecord record : messageBatch) - currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()), - new OffsetAndMetadata(record.kafkaOffset() + 1)); - messageBatch.clear(); - // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that - // the task had not explicitly paused - if (pausedForRedelivery) { - for (TopicPartition tp : consumer.assignment()) - if (!context.pausedPartitions().contains(tp)) - consumer.resume(tp); - pausedForRedelivery = false; - } - } catch (RetriableException e) { - log.error("RetriableException from SinkTask {}: {}", id, e); - // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data, - // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc. - pausedForRedelivery = true; - for (TopicPartition tp : consumer.assignment()) - consumer.pause(tp); - // Let this exit normally, the batch will be reprocessed on the next loop. - } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id); - log.error("Task is being killed and will not recover until manually restarted:", t); - throw new CopycatException("Exiting WorkerSinkTask due to unrecoverable exception."); - } - } - - private void rewind() { - Map<TopicPartition, Long> offsets = context.offsets(); - if (offsets.isEmpty()) { - return; - } - for (TopicPartition tp: offsets.keySet()) { - Long offset = offsets.get(tp); - if (offset != null) { - log.trace("Rewind {} to offset {}.", tp, offset); - consumer.seek(tp, offset); - lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset)); - currentOffsets.put(tp, new OffsetAndMetadata(offset)); - } - } - context.clearOffsets(); - } - - private class HandleRebalance implements ConsumerRebalanceListener { - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - lastCommittedOffsets = new HashMap<>(); - currentOffsets = new HashMap<>(); - for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); - lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); - currentOffsets.put(tp, new OffsetAndMetadata(pos)); - log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); - } - - // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make - // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed. - // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own. - if (pausedForRedelivery) { - pausedForRedelivery = false; - Set<TopicPartition> assigned = new HashSet<>(partitions); - Set<TopicPartition> taskPaused = context.pausedPartitions(); - - for (TopicPartition tp : partitions) { - if (!taskPaused.contains(tp)) - consumer.resume(tp); - } - - Iterator<TopicPartition> tpIter = taskPaused.iterator(); - while (tpIter.hasNext()) { - TopicPartition tp = tpIter.next(); - if (assigned.contains(tp)) - tpIter.remove(); - } - } - - // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon - // task start. Since this callback gets invoked during that initial setup before we've started the task, we - // need to guard against invoking the user's callback method during that period. - if (started) - task.onPartitionsAssigned(partitions); - } - - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); - // Make sure we don't have any leftover data since offsets will be reset to committed positions - messageBatch.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java deleted file mode 100644 index 5257ee4..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.copycat.errors.IllegalWorkerStateException; -import org.apache.kafka.copycat.sink.SinkTaskContext; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class WorkerSinkTaskContext implements SinkTaskContext { - private Map<TopicPartition, Long> offsets; - private long timeoutMs; - private KafkaConsumer<byte[], byte[]> consumer; - private final Set<TopicPartition> pausedPartitions; - - public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) { - this.offsets = new HashMap<>(); - this.timeoutMs = -1L; - this.consumer = consumer; - this.pausedPartitions = new HashSet<>(); - } - - @Override - public void offset(Map<TopicPartition, Long> offsets) { - this.offsets.putAll(offsets); - } - - @Override - public void offset(TopicPartition tp, long offset) { - offsets.put(tp, offset); - } - - public void clearOffsets() { - offsets.clear(); - } - - /** - * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework. - * @return the map of offsets - */ - public Map<TopicPartition, Long> offsets() { - return offsets; - } - - @Override - public void timeout(long timeoutMs) { - this.timeoutMs = timeoutMs; - } - - /** - * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework. - * @return the backoff timeout in milliseconds. - */ - public long timeout() { - return timeoutMs; - } - - @Override - public Set<TopicPartition> assignment() { - if (consumer == null) { - throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized"); - } - return consumer.assignment(); - } - - @Override - public void pause(TopicPartition... partitions) { - if (consumer == null) { - throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized"); - } - try { - for (TopicPartition partition : partitions) - pausedPartitions.add(partition); - consumer.pause(partitions); - } catch (IllegalStateException e) { - throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); - } - } - - @Override - public void resume(TopicPartition... partitions) { - if (consumer == null) { - throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); - } - try { - for (TopicPartition partition : partitions) - pausedPartitions.remove(partition); - consumer.resume(partitions); - } catch (IllegalStateException e) { - throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); - } - } - - public Set<TopicPartition> pausedPartitions() { - return pausedPartitions; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java deleted file mode 100644 index ab3f1fe..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.util.ShutdownableThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to - * simplify testing. - */ -class WorkerSinkTaskThread extends ShutdownableThread { - private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - - private final WorkerSinkTask task; - private long nextCommit; - private boolean committing; - private int commitSeqno; - private long commitStarted; - private int commitFailures; - - public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, - WorkerConfig workerConfig) { - super(name); - this.task = task; - this.nextCommit = time.milliseconds() + - workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - this.committing = false; - this.commitSeqno = 0; - this.commitStarted = -1; - this.commitFailures = 0; - } - - @Override - public void execute() { - // Try to join and start. If we're interrupted before this completes, bail. - if (!task.joinConsumerGroupAndStart()) - return; - - while (getRunning()) { - iteration(); - } - - // Make sure any uncommitted data has committed - task.commitOffsets(true, -1); - } - - public void iteration() { - long now = task.time().milliseconds(); - - // Maybe commit - if (!committing && now >= nextCommit) { - synchronized (this) { - committing = true; - commitSeqno += 1; - commitStarted = now; - } - task.commitOffsets(false, commitSeqno); - nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - } - - // Check for timed out commits - long commitTimeout = commitStarted + task.workerConfig().getLong( - WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); - if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", this); - commitFailures++; - committing = false; - } - - // And process messages - long timeoutMs = Math.max(nextCommit - now, 0); - task.poll(timeoutMs); - } - - public void onCommitCompleted(Throwable error, long seqno) { - synchronized (this) { - if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); - } else { - if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", this, error); - commitFailures++; - } else { - log.debug("Finished {} offset commit successfully in {} ms", - this, task.time().milliseconds() - commitStarted); - commitFailures = 0; - } - committing = false; - } - } - } - - public int commitFailures() { - return commitFailures; - } -}