http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java new file mode 100644 index 0000000..7f8b7c2 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.copycat.util.Callback; + +import java.util.Properties; + +/** + * <p> + * The herder interface tracks and manages workers and connectors. It is the main interface for external components + * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class + * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so + * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one + * of the workers. + * </p> + * <p> + * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks, + * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple + * wrappers of the functionality provided by this interface. + * </p> + * <p> + * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, + * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new + * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the + * same process, so the standalone herder implementation can immediately instantiate and start the connector and its + * tasks. + * </p> + */ +public interface Herder { + + void start(); + + void stop(); + + /** + * Submit a connector job to the cluster. This works from any node by forwarding the request to + * the leader herder if necessary. + * + * @param connectorProps user-specified properties for this job + * @param callback callback to invoke when the request completes + */ + void addConnector(Properties connectorProps, Callback<String> callback); + + /** + * Delete a connector job by name. + * + * @param name name of the connector job to shutdown and delete + * @param callback callback to invoke when the request completes + */ + void deleteConnector(String name, Callback<Void> callback); +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java new file mode 100644 index 0000000..f47c984 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.copycat.sink.SinkTaskContext; + +class SinkTaskContextImpl extends SinkTaskContext { + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java new file mode 100644 index 0000000..953cfa5 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * <p> + * Manages offset commit scheduling and execution for SourceTasks. + * </p> + * <p> + * Unlike sink tasks which directly manage their offset commits in the main poll() thread since + * they drive the event loop and control (for all intents and purposes) the timeouts, source + * tasks are at the whim of the connector and cannot be guaranteed to wake up on the necessary + * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and + * ensures they are invoked in a timely fashion. + * </p> + * <p> + * The current implementation uses a single thread to process commits and + * </p> + */ +class SourceTaskOffsetCommitter { + private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); + + private Time time; + private WorkerConfig config; + private ScheduledExecutorService commitExecutorService = null; + private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>(); + + SourceTaskOffsetCommitter(Time time, WorkerConfig config) { + this.time = time; + this.config = config; + commitExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + public void close(long timeoutMs) { + commitExecutorService.shutdown(); + try { + if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + log.error("Graceful shutdown of offset commitOffsets thread timed out."); + } + } catch (InterruptedException e) { + // ignore and allow to exit immediately + } + } + + public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) { + long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + commit(workerTask); + } + }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS); + committers.put(id, commitFuture); + } + + public void remove(ConnectorTaskId id) { + ScheduledFuture<?> commitFuture = committers.remove(id); + commitFuture.cancel(false); + } + + public void commit(WorkerSourceTask workerTask) { + try { + log.debug("Committing offsets for {}", workerTask); + boolean success = workerTask.commitOffsets(); + if (!success) { + log.error("Failed to commit offsets for {}", workerTask); + } + } catch (Throwable t) { + // We're very careful about exceptions here since any uncaught exceptions in the commit + // thread would cause the fixed interval schedule on the ExecutorService to stop running + // for that task + log.error("Unhandled exception when committing {}: ", workerTask, t); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java new file mode 100644 index 0000000..55d0784 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.source.SourceTask; +import org.apache.kafka.copycat.storage.*; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * <p> + * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving + * data to/from Kafka. + * </p> + * <p> + * Since each task has a dedicated thread, this is mainly just a container for them. + * </p> + */ +public class Worker<K, V> { + private static final Logger log = LoggerFactory.getLogger(Worker.class); + + private Time time; + private WorkerConfig config; + private Converter<K> keyConverter; + private Converter<V> valueConverter; + private OffsetBackingStore offsetBackingStore; + private Serializer<K> offsetKeySerializer; + private Serializer<V> offsetValueSerializer; + private Deserializer<K> offsetKeyDeserializer; + private Deserializer<V> offsetValueDeserializer; + private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); + private KafkaProducer<K, V> producer; + private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; + + public Worker(WorkerConfig config) { + this(new SystemTime(), config, null, null, null, null, null); + } + + @SuppressWarnings("unchecked") + public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore, + Serializer offsetKeySerializer, Serializer offsetValueSerializer, + Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) { + this.time = time; + this.config = config; + this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + + if (offsetBackingStore != null) { + this.offsetBackingStore = offsetBackingStore; + } else { + this.offsetBackingStore = new FileOffsetBackingStore(); + this.offsetBackingStore.configure(config.originals()); + } + + if (offsetKeySerializer != null) { + this.offsetKeySerializer = offsetKeySerializer; + } else { + this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); + this.offsetKeySerializer.configure(config.originals(), true); + } + + if (offsetValueSerializer != null) { + this.offsetValueSerializer = offsetValueSerializer; + } else { + this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); + this.offsetValueSerializer.configure(config.originals(), false); + } + + if (offsetKeyDeserializer != null) { + this.offsetKeyDeserializer = offsetKeyDeserializer; + } else { + this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + this.offsetKeyDeserializer.configure(config.originals(), true); + } + + if (offsetValueDeserializer != null) { + this.offsetValueDeserializer = offsetValueDeserializer; + } else { + this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + this.offsetValueDeserializer.configure(config.originals(), false); + } + } + + public void start() { + log.info("Worker starting"); + + Properties unusedConfigs = config.getUnusedProperties(); + + Map<String, Object> producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()); + for (String propName : unusedConfigs.stringPropertyNames()) { + producerProps.put(propName, unusedConfigs.getProperty(propName)); + } + producer = new KafkaProducer<>(producerProps); + + offsetBackingStore.start(); + sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config); + + log.info("Worker started"); + } + + public void stop() { + log.info("Worker stopping"); + + long started = time.milliseconds(); + long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); + + for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { + WorkerTask task = entry.getValue(); + log.warn("Shutting down task {} uncleanly; herder should have shut down " + + "tasks before the Worker is stopped.", task); + try { + task.stop(); + } catch (CopycatException e) { + log.error("Error while shutting down task " + task, e); + } + } + + for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { + WorkerTask task = entry.getValue(); + log.debug("Waiting for task {} to finish shutting down", task); + if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) + log.error("Graceful shutdown of task {} failed.", task); + task.close(); + } + + long timeoutMs = limit - time.milliseconds(); + sourceTaskOffsetCommitter.close(timeoutMs); + + offsetBackingStore.start(); + + log.info("Worker stopped"); + } + + /** + * Add a new task. + * @param id Globally unique ID for this task. + * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task} + * class to instantiate. Must be a subclass of either + * {@link org.apache.kafka.copycat.source.SourceTask} or + * {@link org.apache.kafka.copycat.sink.SinkTask}. + * @param props configuration options for the task + */ + public void addTask(ConnectorTaskId id, String taskClassName, Properties props) { + if (tasks.containsKey(id)) { + String msg = "Task already exists in this worker; the herder should not have requested " + + "that this : " + id; + log.error(msg); + throw new CopycatException(msg); + } + + final Task task = instantiateTask(taskClassName); + + // Decide which type of worker task we need based on the type of task. + final WorkerTask workerTask; + if (task instanceof SourceTask) { + SourceTask sourceTask = (SourceTask) task; + OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(), + keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer); + OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(), + keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer); + workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer, + offsetReader, offsetWriter, config, time); + } else if (task instanceof SinkTask) { + workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time); + } else { + log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); + throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask"); + } + + // Start the task before adding modifying any state, any exceptions are caught higher up the + // call chain and there's no cleanup to do here + workerTask.start(props); + tasks.put(id, workerTask); + } + + private static Task instantiateTask(String taskClassName) { + try { + return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class)); + } catch (ClassNotFoundException e) { + throw new CopycatException("Task class not found", e); + } + } + + public void stopTask(ConnectorTaskId id) { + WorkerTask task = getTask(id); + task.stop(); + if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) + log.error("Graceful stop of task {} failed.", task); + task.close(); + tasks.remove(id); + } + + private WorkerTask getTask(ConnectorTaskId id) { + WorkerTask task = tasks.get(id); + if (task == null) { + log.error("Task not found: " + id); + throw new CopycatException("Task not found: " + id); + } + return task; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java new file mode 100644 index 0000000..4eaf756 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.sink.SinkRecord; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.sink.SinkTaskContext; +import org.apache.kafka.copycat.storage.Converter; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * WorkerTask that uses a SinkTask to export data from Kafka. + */ +class WorkerSinkTask<K, V> implements WorkerTask { + private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); + + private final ConnectorTaskId id; + private final SinkTask task; + private final WorkerConfig workerConfig; + private final Time time; + private final Converter<K> keyConverter; + private final Converter<V> valueConverter; + private WorkerSinkTaskThread workThread; + private KafkaConsumer<K, V> consumer; + private final SinkTaskContext context; + + public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, + Converter<K> keyConverter, Converter<V> valueConverter, Time time) { + this.id = id; + this.task = task; + this.workerConfig = workerConfig; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + context = new SinkTaskContextImpl(); + this.time = time; + } + + @Override + public void start(Properties props) { + task.initialize(context); + task.start(props); + consumer = createConsumer(props); + workThread = createWorkerThread(); + workThread.start(); + } + + @Override + public void stop() { + // Offset commit is handled upon exit in work thread + task.stop(); + if (workThread != null) + workThread.startGracefulShutdown(); + consumer.wakeup(); + } + + @Override + public boolean awaitStop(long timeoutMs) { + if (workThread != null) { + try { + boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + if (!success) + workThread.forceShutdown(); + return success; + } catch (InterruptedException e) { + return false; + } + } + return true; + } + + @Override + public void close() { + // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout + // passed in + if (consumer != null) + consumer.close(); + } + + /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ + public void poll(long timeoutMs) { + try { + log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); + ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs); + log.trace("{} polling returned {} messages", id, msgs.count()); + deliverMessages(msgs); + } catch (ConsumerWakeupException we) { + log.trace("{} consumer woken up", id); + } + } + + /** + * Starts an offset commit by flushing outstanding messages from the task and then starting + * the write commit. This should only be invoked by the WorkerSinkTaskThread. + **/ + public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) { + HashMap<TopicPartition, Long> offsets = new HashMap<>(); + for (TopicPartition tp : consumer.subscriptions()) { + offsets.put(tp, consumer.position(tp)); + } + // We only don't flush the task in one case: when shutting down, the task has already been + // stopped and all data should have already been flushed + if (flush) { + try { + task.flush(offsets); + } catch (Throwable t) { + log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); + workThread.onCommitCompleted(t, seqno); + return; + } + } + + ConsumerCommitCallback cb = new ConsumerCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, Long> offsets, Exception error) { + workThread.onCommitCompleted(error, seqno); + } + }; + consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb); + } + + public Time getTime() { + return time; + } + + public WorkerConfig getWorkerConfig() { + return workerConfig; + } + + private KafkaConsumer<K, V> createConsumer(Properties taskProps) { + String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); + if (topicsStr == null || topicsStr.isEmpty()) + throw new CopycatException("Sink tasks require a list of topics."); + String[] topics = topicsStr.split(","); + + // Include any unknown worker configs so consumer configs can be set globally on the worker + // and through to the task + Properties props = workerConfig.getUnusedProperties(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.toString()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName()); + + KafkaConsumer<K, V> newConsumer; + try { + newConsumer = new KafkaConsumer<>(props); + } catch (Throwable t) { + throw new CopycatException("Failed to create consumer", t); + } + + log.debug("Task {} subscribing to topics {}", id, topics); + newConsumer.subscribe(topics); + + // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to + // enable exactly once delivery to that system). + // + // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee. + // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly. + newConsumer.poll(0); + Map<TopicPartition, Long> offsets = context.getOffsets(); + for (TopicPartition tp : newConsumer.subscriptions()) { + Long offset = offsets.get(tp); + if (offset != null) + newConsumer.seek(tp, offset); + } + return newConsumer; + } + + private WorkerSinkTaskThread createWorkerThread() { + return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); + } + + private void deliverMessages(ConsumerRecords<K, V> msgs) { + // Finally, deliver this batch to the sink + if (msgs.count() > 0) { + List<SinkRecord> records = new ArrayList<>(); + for (ConsumerRecord<K, V> msg : msgs) { + log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); + records.add( + new SinkRecord(msg.topic(), msg.partition(), + keyConverter.toCopycatData(msg.key()), + valueConverter.toCopycatData(msg.value()), + msg.offset()) + ); + } + + try { + task.put(records); + } catch (CopycatException e) { + log.error("Exception from SinkTask {}: ", id, e); + } catch (Throwable t) { + log.error("Unexpected exception from SinkTask {}: ", id, t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java new file mode 100644 index 0000000..b946343 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to + * simplify testing. + */ +class WorkerSinkTaskThread extends ShutdownableThread { + private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); + + private final WorkerSinkTask task; + private long nextCommit; + private boolean committing; + private int commitSeqno; + private long commitStarted; + private int commitFailures; + + public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, + WorkerConfig workerConfig) { + super(name); + this.task = task; + this.nextCommit = time.milliseconds() + + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + this.committing = false; + this.commitSeqno = 0; + this.commitStarted = -1; + this.commitFailures = 0; + } + + @Override + public void execute() { + while (getRunning()) { + iteration(); + } + // Make sure any uncommitted data has committed + task.commitOffsets(task.getTime().milliseconds(), true, -1, false); + } + + public void iteration() { + long now = task.getTime().milliseconds(); + + // Maybe commit + if (!committing && now >= nextCommit) { + synchronized (this) { + committing = true; + commitSeqno += 1; + commitStarted = now; + } + task.commitOffsets(now, false, commitSeqno, true); + nextCommit += task.getWorkerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + } + + // Check for timed out commits + long commitTimeout = commitStarted + task.getWorkerConfig().getLong( + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + if (committing && now >= commitTimeout) { + log.warn("Commit of {} offsets timed out", this); + commitFailures++; + committing = false; + } + + // And process messages + long timeoutMs = Math.max(nextCommit - now, 0); + task.poll(timeoutMs); + } + + public void onCommitCompleted(Throwable error, long seqno) { + synchronized (this) { + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + commitFailures++; + } else { + log.debug("Finished {} offset commit successfully in {} ms", + this, task.getTime().milliseconds() - commitStarted); + commitFailures = 0; + } + committing = false; + } + } + } + + public int getCommitFailures() { + return commitFailures; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java new file mode 100644 index 0000000..c80adb4 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTask; +import org.apache.kafka.copycat.source.SourceTaskContext; +import org.apache.kafka.copycat.storage.Converter; +import org.apache.kafka.copycat.storage.OffsetStorageReader; +import org.apache.kafka.copycat.storage.OffsetStorageWriter; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * WorkerTask that uses a SourceTask to ingest data into Kafka. + */ +class WorkerSourceTask<K, V> implements WorkerTask { + private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); + + private ConnectorTaskId id; + private SourceTask task; + private final Converter<K> keyConverter; + private final Converter<V> valueConverter; + private KafkaProducer<K, V> producer; + private WorkerSourceTaskThread workThread; + private OffsetStorageReader offsetReader; + private OffsetStorageWriter<K, V> offsetWriter; + private final WorkerConfig workerConfig; + private final Time time; + + // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because + // there is no IdentityHashSet. + private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> + outstandingMessages; + // A second buffer is used while an offset flush is running + private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> + outstandingMessagesBacklog; + private boolean flushing; + + public WorkerSourceTask(ConnectorTaskId id, SourceTask task, + Converter<K> keyConverter, Converter<V> valueConverter, + KafkaProducer<K, V> producer, + OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter, + WorkerConfig workerConfig, Time time) { + this.id = id; + this.task = task; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.producer = producer; + this.offsetReader = offsetReader; + this.offsetWriter = offsetWriter; + this.workerConfig = workerConfig; + this.time = time; + + this.outstandingMessages = new IdentityHashMap<>(); + this.outstandingMessagesBacklog = new IdentityHashMap<>(); + this.flushing = false; + } + + @Override + public void start(Properties props) { + task.initialize(new SourceTaskContext(offsetReader)); + task.start(props); + workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id); + workThread.start(); + } + + @Override + public void stop() { + task.stop(); + commitOffsets(); + if (workThread != null) + workThread.startGracefulShutdown(); + } + + @Override + public boolean awaitStop(long timeoutMs) { + if (workThread != null) { + try { + boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + if (!success) + workThread.forceShutdown(); + return success; + } catch (InterruptedException e) { + return false; + } + } + return true; + } + + @Override + public void close() { + // Nothing to do + } + + /** + * Send a batch of records. This is atomic up to the point of getting the messages into the + * Producer and recorded in our set of outstanding messages, so either all or none will be sent + * @param records + */ + private synchronized void sendRecords(List<SourceRecord> records) { + for (SourceRecord record : records) { + final ProducerRecord<K, V> producerRecord + = new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(), + keyConverter.fromCopycatData(record.getKey()), + valueConverter.fromCopycatData(record.getValue())); + log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue()); + if (!flushing) { + outstandingMessages.put(producerRecord, producerRecord); + } else { + outstandingMessagesBacklog.put(producerRecord, producerRecord); + } + producer.send( + producerRecord, + new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + log.error("Failed to send record: ", e); + } else { + log.trace("Wrote record successfully: topic {} partition {} offset {}", + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + } + recordSent(producerRecord); + } + }); + // Offsets are converted & serialized in the OffsetWriter + offsetWriter.setOffset(record.getSourcePartition(), record.getSourceOffset()); + } + } + + private synchronized void recordSent(final ProducerRecord<K, V> record) { + ProducerRecord<K, V> removed = outstandingMessages.remove(record); + // While flushing, we may also see callbacks for items in the backlog + if (removed == null && flushing) + removed = outstandingMessagesBacklog.remove(record); + // But if neither one had it, something is very wrong + if (removed == null) { + log.error("Saw callback for record that was not present in the outstanding message set: " + + "{}", record); + } else if (flushing && outstandingMessages.isEmpty()) { + // flush thread may be waiting on the outstanding messages to clear + this.notifyAll(); + } + } + + public boolean commitOffsets() { + long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + + long started = time.milliseconds(); + long timeout = started + commitTimeoutMs; + + synchronized (this) { + // First we need to make sure we snapshot everything in exactly the current state. This + // means both the current set of messages we're still waiting to finish, stored in this + // class, which setting flushing = true will handle by storing any new values into a new + // buffer; and the current set of user-specified offsets, stored in the + // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. + flushing = true; + boolean flushStarted = offsetWriter.beginFlush(); + // Still wait for any producer records to flush, even if there aren't any offsets to write + // to persistent storage + + // Next we need to wait for all outstanding messages to finish sending + while (!outstandingMessages.isEmpty()) { + try { + long timeoutMs = timeout - time.milliseconds(); + if (timeoutMs <= 0) { + log.error( + "Failed to flush {}, timed out while waiting for producer to flush outstanding " + + "messages", this.toString()); + finishFailedFlush(); + return false; + } + this.wait(timeoutMs); + } catch (InterruptedException e) { + // ignore + } + } + + if (!flushStarted) { + // There was nothing in the offsets to process, but we still waited for the data in the + // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. + // flush time, which can be used for monitoring even if the connector doesn't record any + // offsets. + finishSuccessfulFlush(); + log.debug("Finished {} offset commitOffsets successfully in {} ms", + this, time.milliseconds() - started); + return true; + } + } + + // Now we can actually flush the offsets to user storage. + Future<Void> flushFuture = offsetWriter.doFlush(new org.apache.kafka.copycat.util.Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + if (error != null) { + log.error("Failed to flush {} offsets to storage: ", this, error); + } else { + log.trace("Finished flushing {} offsets to storage", this); + } + } + }); + // Very rare case: offsets were unserializable and we finished immediately, unable to store + // any data + if (flushFuture == null) { + finishFailedFlush(); + return false; + } + try { + flushFuture.get(Math.max(timeout - time.milliseconds(), 0), TimeUnit.MILLISECONDS); + // There's a small race here where we can get the callback just as this times out (and log + // success), but then catch the exception below and cancel everything. This won't cause any + // errors, is only wasteful in this minor edge case, and the worst result is that the log + // could look a little confusing. + } catch (InterruptedException e) { + log.warn("Flush of {} offsets interrupted, cancelling", this); + finishFailedFlush(); + return false; + } catch (ExecutionException e) { + log.error("Flush of {} offsets threw an unexpected exception: ", this, e); + finishFailedFlush(); + return false; + } catch (TimeoutException e) { + log.error("Timed out waiting to flush {} offsets to storage", this); + finishFailedFlush(); + return false; + } + + finishSuccessfulFlush(); + log.debug("Finished {} commitOffsets successfully in {} ms", + this, time.milliseconds() - started); + return true; + } + + private synchronized void finishFailedFlush() { + offsetWriter.cancelFlush(); + outstandingMessages.putAll(outstandingMessagesBacklog); + outstandingMessagesBacklog.clear(); + flushing = false; + } + + private void finishSuccessfulFlush() { + // If we were successful, we can just swap instead of replacing items back into the original map + IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages; + outstandingMessages = outstandingMessagesBacklog; + outstandingMessagesBacklog = temp; + flushing = false; + } + + + private class WorkerSourceTaskThread extends ShutdownableThread { + public WorkerSourceTaskThread(String name) { + super(name); + } + + @Override + public void execute() { + try { + while (getRunning()) { + List<SourceRecord> records = task.poll(); + if (records == null) + continue; + sendRecords(records); + } + } catch (InterruptedException e) { + // Ignore and allow to exit. + } + } + } + + @Override + public String toString() { + return "WorkerSourceTask{" + + "id=" + id + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java new file mode 100644 index 0000000..af225bb --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import java.util.Properties; + +/** + * Handles processing for an individual task. This interface only provides the basic methods + * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with + * Kafka to create a data flow. + */ +interface WorkerTask { + /** + * Start the Task + * @param props initial configuration + */ + void start(Properties props); + + /** + * Stop this task from processing messages. This method does not block, it only triggers + * shutdown. Use #{@link #awaitStop} to block until completion. + */ + void stop(); + + /** + * Wait for this task to finish stopping. + * + * @param timeoutMs + * @return true if successful, false if the timeout was reached + */ + boolean awaitStop(long timeoutMs); + + /** + * Close this task. This is different from #{@link #stop} and #{@link #awaitStop} in that the + * stop methods ensure processing has stopped but may leave resources allocated. This method + * should clean up all resources. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java new file mode 100644 index 0000000..0e14015 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.standalone; + +import org.apache.kafka.copycat.connector.ConnectorContext; + +/** + * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks + * in a single process. + */ +class StandaloneConnectorContext implements ConnectorContext { + + private StandaloneHerder herder; + private String connectorName; + + public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) { + this.herder = herder; + this.connectorName = connectorName; + } + + @Override + public void requestTaskReconfiguration() { + // This is trivial to forward since there is only one herder and it's in memory in this + // process + herder.requestTaskReconfiguration(connectorName); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java new file mode 100644 index 0000000..2ed9183 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.standalone; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.connector.Connector; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.runtime.ConnectorConfig; +import org.apache.kafka.copycat.runtime.Herder; +import org.apache.kafka.copycat.runtime.Worker; +import org.apache.kafka.copycat.sink.SinkConnector; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Single process, in-memory "herder". Useful for a standalone copycat process. + */ +public class StandaloneHerder implements Herder { + private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class); + + private Worker worker; + private HashMap<String, ConnectorState> connectors = new HashMap<>(); + + public StandaloneHerder(Worker worker) { + this.worker = worker; + } + + public synchronized void start() { + log.info("Herder starting"); + log.info("Herder started"); + } + + public synchronized void stop() { + log.info("Herder stopping"); + + // There's no coordination/hand-off to do here since this is all standalone. Instead, we + // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all + // the tasks. + for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) { + ConnectorState state = entry.getValue(); + stopConnector(state); + } + connectors.clear(); + + log.info("Herder stopped"); + } + + @Override + public synchronized void addConnector(Properties connectorProps, + Callback<String> callback) { + try { + ConnectorState connState = createConnector(connectorProps); + if (callback != null) + callback.onCompletion(null, connState.name); + // This should always be a new job, create jobs from scratch + createConnectorTasks(connState); + } catch (CopycatException e) { + if (callback != null) + callback.onCompletion(e, null); + } + } + + @Override + public synchronized void deleteConnector(String name, Callback<Void> callback) { + try { + destroyConnector(name); + if (callback != null) + callback.onCompletion(null, null); + } catch (CopycatException e) { + if (callback != null) + callback.onCompletion(e, null); + } + } + + // Creates the and configures the connector. Does not setup any tasks + private ConnectorState createConnector(Properties connectorProps) { + ConnectorConfig connConfig = new ConnectorConfig(connectorProps); + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + log.info("Creating connector {} of type {}", connName, className); + int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); + List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only + Properties configs = connConfig.getUnusedProperties(); + + if (connectors.containsKey(connName)) { + log.error("Ignoring request to create connector due to conflicting connector name"); + throw new CopycatException("Connector with name " + connName + " already exists"); + } + + final Connector connector; + try { + connector = instantiateConnector(className); + } catch (Throwable t) { + // Catches normal exceptions due to instantiation errors as well as any runtime errors that + // may be caused by user code + throw new CopycatException("Failed to create connector instance", t); + } + connector.initialize(new StandaloneConnectorContext(this, connName)); + try { + connector.start(configs); + } catch (CopycatException e) { + throw new CopycatException("Connector threw an exception while starting", e); + } + ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics); + connectors.put(connName, state); + + log.info("Finished creating connector {}", connName); + + return state; + } + + private static Connector instantiateConnector(String className) { + try { + return Utils.newInstance(className, Connector.class); + } catch (ClassNotFoundException e) { + throw new CopycatException("Couldn't instantiate connector class", e); + } + } + + private void destroyConnector(String connName) { + log.info("Destroying connector {}", connName); + ConnectorState state = connectors.get(connName); + if (state == null) { + log.error("Failed to destroy connector {} because it does not exist", connName); + throw new CopycatException("Connector does not exist"); + } + + stopConnector(state); + connectors.remove(state.name); + + log.info("Finished destroying connector {}", connName); + } + + // Stops a connectors tasks, then the connector + private void stopConnector(ConnectorState state) { + removeConnectorTasks(state); + try { + state.connector.stop(); + } catch (CopycatException e) { + log.error("Error shutting down connector {}: ", state.connector, e); + } + } + + private void createConnectorTasks(ConnectorState state) { + String taskClassName = state.connector.getTaskClass().getName(); + + log.info("Creating tasks for connector {} of type {}", state.name, taskClassName); + + List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks); + + // Generate the final configs, including framework provided settings + Map<ConnectorTaskId, Properties> taskProps = new HashMap<>(); + for (int i = 0; i < taskConfigs.size(); i++) { + ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); + Properties config = taskConfigs.get(i); + // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics + // is automatically provided to tasks since it is required by the framework, but this + String subscriptionTopics = Utils.join(state.inputTopics, ","); + if (state.connector instanceof SinkConnector) { + // Make sure we don't modify the original since the connector may reuse it internally + Properties configForSink = new Properties(); + configForSink.putAll(config); + configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics); + config = configForSink; + } + taskProps.put(taskId, config); + } + + // And initiate the tasks + for (int i = 0; i < taskConfigs.size(); i++) { + ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); + Properties config = taskProps.get(taskId); + try { + worker.addTask(taskId, taskClassName, config); + // We only need to store the task IDs so we can clean up. + state.tasks.add(taskId); + } catch (Throwable e) { + log.error("Failed to add task {}: ", taskId, e); + // Swallow this so we can continue updating the rest of the tasks + // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task + // that died after starting successfully. + } + } + } + + private void removeConnectorTasks(ConnectorState state) { + Iterator<ConnectorTaskId> taskIter = state.tasks.iterator(); + while (taskIter.hasNext()) { + ConnectorTaskId taskId = taskIter.next(); + try { + worker.stopTask(taskId); + taskIter.remove(); + } catch (CopycatException e) { + log.error("Failed to stop task {}: ", taskId, e); + // Swallow this so we can continue stopping the rest of the tasks + // FIXME: Forcibly kill the task? + } + } + } + + private void updateConnectorTasks(ConnectorState state) { + removeConnectorTasks(state); + createConnectorTasks(state); + } + + /** + * Requests reconfiguration of the task. This should only be triggered by + * {@link StandaloneConnectorContext}. + * + * @param connName name of the connector that should be reconfigured + */ + public synchronized void requestTaskReconfiguration(String connName) { + ConnectorState state = connectors.get(connName); + if (state == null) { + log.error("Task that requested reconfiguration does not exist: {}", connName); + return; + } + updateConnectorTasks(state); + } + + + private static class ConnectorState { + public String name; + public Connector connector; + public int maxTasks; + public List<String> inputTopics; + Set<ConnectorTaskId> tasks; + + public ConnectorState(String name, Connector connector, int maxTasks, + List<String> inputTopics) { + this.name = name; + this.connector = connector; + this.maxTasks = maxTasks; + this.inputTopics = inputTopics; + this.tasks = new HashSet<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java new file mode 100644 index 0000000..dfa9e78 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.copycat.errors.CopycatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves + * similarly to a real backing store, operations are executed asynchronously on a background thread. + */ +public class FileOffsetBackingStore extends MemoryOffsetBackingStore { + private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class); + + public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename"; + private File file; + + public FileOffsetBackingStore() { + + } + + @Override + public void configure(Map<String, ?> props) { + super.configure(props); + String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG); + file = new File(filename); + } + + @Override + public synchronized void start() { + super.start(); + log.info("Starting FileOffsetBackingStore with file {}", file); + load(); + } + + @Override + public synchronized void stop() { + super.stop(); + // Nothing to do since this doesn't maintain any outstanding connections/data + log.info("Stopped FileOffsetBackingStore"); + } + + @SuppressWarnings("unchecked") + private void load() { + try { + ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); + Object obj = is.readObject(); + if (!(obj instanceof HashMap)) + throw new CopycatException("Expected HashMap but found " + obj.getClass()); + HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj; + data = new HashMap<>(); + for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) { + HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>(); + for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) { + ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; + ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : + null; + converted.put(key, value); + } + data.put(entry.getKey(), converted); + } + is.close(); + } catch (FileNotFoundException | EOFException e) { + // FileNotFoundException: Ignore, may be new. + // EOFException: Ignore, this means the file was missing or corrupt + } catch (IOException | ClassNotFoundException e) { + throw new CopycatException(e); + } + } + + protected void save() { + try { + ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file)); + HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>(); + for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) { + HashMap<byte[], byte[]> converted = new HashMap<>(); + for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) { + byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; + byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; + converted.put(key, value); + } + raw.put(entry.getKey(), converted); + } + os.writeObject(raw); + os.close(); + } catch (IOException e) { + throw new CopycatException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java new file mode 100644 index 0000000..6ffba58 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.copycat.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this + * behaves similarly to a real backing store, operations are executed asynchronously on a + * background thread. + */ +public class MemoryOffsetBackingStore implements OffsetBackingStore { + private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); + + protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>(); + protected ExecutorService executor = Executors.newSingleThreadExecutor(); + + public MemoryOffsetBackingStore() { + + } + + @Override + public void configure(Map<String, ?> props) { + } + + @Override + public synchronized void start() { + } + + @Override + public synchronized void stop() { + // Nothing to do since this doesn't maintain any outstanding connections/data + } + + @Override + public Future<Map<ByteBuffer, ByteBuffer>> get( + final String namespace, final Collection<ByteBuffer> keys, + final Callback<Map<ByteBuffer, ByteBuffer>> callback) { + return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() { + @Override + public Map<ByteBuffer, ByteBuffer> call() throws Exception { + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(); + synchronized (MemoryOffsetBackingStore.this) { + Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace); + if (namespaceData == null) + return result; + for (ByteBuffer key : keys) { + result.put(key, namespaceData.get(key)); + } + } + if (callback != null) + callback.onCompletion(null, result); + return result; + } + }); + + } + + @Override + public Future<Void> set(final String namespace, final Map<ByteBuffer, ByteBuffer> values, + final Callback<Void> callback) { + return executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + synchronized (MemoryOffsetBackingStore.this) { + Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace); + if (namespaceData == null) { + namespaceData = new HashMap<>(); + data.put(namespace, namespaceData); + } + for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { + namespaceData.put(entry.getKey(), entry.getValue()); + } + save(); + } + if (callback != null) + callback.onCompletion(null, null); + return null; + } + }); + } + + // Hook to allow subclasses to persist data + protected void save() { + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java new file mode 100644 index 0000000..e8cb2ae --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.copycat.util.Callback; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * <p> + * OffsetBackingStore is an interface for storage backends that store key-value data. The backing + * store doesn't need to handle serialization or deserialization. It only needs to support + * reading/writing bytes. Since it is expected these operations will require network + * operations, only bulk operations are supported. + * </p> + * <p> + * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances + * that are associated with individual tasks, all operations include a namespace which should be + * used to isolate different key spaces. + * </p> + */ +public interface OffsetBackingStore extends Configurable { + + /** + * Start this offset store. + */ + public void start(); + + /** + * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block + * indefinitely. + */ + public void stop(); + + /** + * Get the values for the specified keys + * @param namespace prefix for the keys in this request + * @param keys list of keys to look up + * @param callback callback to invoke on completion + * @return future for the resulting map from key to value + */ + public Future<Map<ByteBuffer, ByteBuffer>> get( + String namespace, Collection<ByteBuffer> keys, + Callback<Map<ByteBuffer, ByteBuffer>> callback); + + /** + * Set the specified keys and values. + * @param namespace prefix for the keys in this request + * @param values map from key to value + * @param callback callback to invoke on completion + * @return void future for the operation + */ + public Future<Void> set(String namespace, Map<ByteBuffer, ByteBuffer> values, + Callback<Void> callback); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java new file mode 100644 index 0000000..7a050dc --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.copycat.errors.CopycatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented + * directly, the interface is only separate from this implementation because it needs to be + * included in the public API package. + */ +public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader { + private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class); + + private final OffsetBackingStore backingStore; + private final String namespace; + private final Converter<K> keyConverter; + private final Converter<V> valueConverter; + private final Serializer<K> keySerializer; + private final Deserializer<V> valueDeserializer; + + public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, + Converter<K> keyConverter, Converter<V> valueConverter, + Serializer<K> keySerializer, Deserializer<V> valueDeserializer) { + this.backingStore = backingStore; + this.namespace = namespace; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.keySerializer = keySerializer; + this.valueDeserializer = valueDeserializer; + } + + @Override + public Object getOffset(Object partition) { + return getOffsets(Arrays.asList(partition)).get(partition); + } + + @Override + public Map<Object, Object> getOffsets(Collection<Object> partitions) { + // Serialize keys so backing store can work with them + Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size()); + for (Object key : partitions) { + try { + byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key)); + ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; + serializedToOriginal.put(keyBuffer, key); + } catch (Throwable t) { + log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with " + + "namespace {}. No value for this data will be returned, which may break the " + + "task or cause it to skip some data.", namespace, t); + } + } + + // Get serialized key -> serialized value from backing store + Map<ByteBuffer, ByteBuffer> raw; + try { + raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get(); + } catch (Exception e) { + log.error("Failed to fetch offsets from namespace {}: ", namespace, e); + throw new CopycatException("Failed to fetch offsets.", e); + } + + // Deserialize all the values and map back to the original keys + Map<Object, Object> result = new HashMap<>(partitions.size()); + for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) { + try { + // Since null could be a valid key, explicitly check whether map contains the key + if (!serializedToOriginal.containsKey(rawEntry.getKey())) { + log.error("Should be able to map {} back to a requested partition-offset key, backing " + + "store may have returned invalid data", rawEntry.getKey()); + continue; + } + Object origKey = serializedToOriginal.get(rawEntry.getKey()); + Object deserializedValue = valueConverter.toCopycatData( + valueDeserializer.deserialize(namespace, rawEntry.getValue().array()) + ); + + result.put(origKey, deserializedValue); + } catch (Throwable t) { + log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with" + + " namespace {}. No value for this data will be returned, which may break the " + + "task or cause it to skip some data. This could either be due to an error in " + + "the connector implementation or incompatible schema.", namespace, t); + } + } + + return result; + } +}