http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java new file mode 100644 index 0000000..927f62b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.CHECKPOINT_FILE_NAME; + +/** + * This class is responsible for the initialization, restoration, closing, flushing etc + * of Global State Stores. There is only ever 1 instance of this class per Application Instance. + */ +public class GlobalStateManagerImpl implements GlobalStateManager { + private static final int MAX_LOCK_ATTEMPTS = 5; + private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class); + + private final ProcessorTopology topology; + private final Consumer<byte[], byte[]> consumer; + private final StateDirectory stateDirectory; + private final Map<String, StateStore> stores = new LinkedHashMap<>(); + private final File baseDir; + private final OffsetCheckpoint checkpoint; + private final Set<String> globalStoreNames = new HashSet<>(); + private HashMap<TopicPartition, Long> checkpointableOffsets; + + public GlobalStateManagerImpl(final ProcessorTopology topology, + final Consumer<byte[], byte[]> consumer, + final StateDirectory stateDirectory) { + this.topology = topology; + this.consumer = consumer; + this.stateDirectory = stateDirectory; + this.baseDir = stateDirectory.globalStateDir(); + this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + } + + @Override + public Set<String> initialize(final InternalProcessorContext processorContext) { + try { + if (!stateDirectory.lockGlobalState(MAX_LOCK_ATTEMPTS)) { + throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir)); + } + } catch (IOException e) { + throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir)); + } + + try { + this.checkpointableOffsets = new HashMap<>(checkpoint.read()); + checkpoint.delete(); + } catch (IOException e) { + try { + stateDirectory.unlockGlobalState(); + } catch (IOException e1) { + log.error("failed to unlock the global state directory", e); + } + throw new StreamsException("Failed to read checkpoints for global state stores", e); + } + + final List<StateStore> stateStores = topology.globalStateStores(); + for (final StateStore stateStore : stateStores) { + globalStoreNames.add(stateStore.name()); + stateStore.init(processorContext, stateStore); + } + return Collections.unmodifiableSet(globalStoreNames); + } + + @Override + public StateStore getGlobalStore(final String name) { + return stores.get(name); + } + + @Override + public StateStore getStore(final String name) { + return getGlobalStore(name); + } + + public File baseDir() { + return baseDir; + } + + public void register(final StateStore store, + final boolean ignored, + final StateRestoreCallback stateRestoreCallback) { + + if (stores.containsKey(store.name())) { + throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name())); + } + + if (!globalStoreNames.contains(store.name())) { + throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); + } + + if (stateRestoreCallback == null) { + throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", store.name())); + } + + log.info("restoring state for global store {}", store.name()); + final List<TopicPartition> topicPartitions = topicPartitionsForStore(store); + consumer.assign(topicPartitions); + final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions); + try { + restoreState(stateRestoreCallback, topicPartitions, highWatermarks); + stores.put(store.name(), store); + } finally { + consumer.assign(Collections.<TopicPartition>emptyList()); + } + + } + + private List<TopicPartition> topicPartitionsForStore(final StateStore store) { + final String sourceTopic = topology.sourceStoreToSourceTopic().get(store.name()); + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic); + if (partitionInfos == null || partitionInfos.isEmpty()) { + throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name())); + } + + final List<TopicPartition> topicPartitions = new ArrayList<>(); + for (PartitionInfo partition : partitionInfos) { + topicPartitions.add(new TopicPartition(partition.topic(), partition.partition())); + } + return topicPartitions; + } + + private void restoreState(final StateRestoreCallback stateRestoreCallback, + final List<TopicPartition> topicPartitions, + final Map<TopicPartition, Long> highWatermarks) { + for (final TopicPartition topicPartition : topicPartitions) { + consumer.assign(Collections.singletonList(topicPartition)); + final Long checkpoint = checkpointableOffsets.get(topicPartition); + if (checkpoint != null) { + consumer.seek(topicPartition, checkpoint); + } else { + consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } + + long offset = consumer.position(topicPartition); + final Long highWatermark = highWatermarks.get(topicPartition); + + while (offset < highWatermark) { + final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); + for (ConsumerRecord<byte[], byte[]> record : records) { + offset = record.offset() + 1; + stateRestoreCallback.restore(record.key(), record.value()); + } + } + checkpointableOffsets.put(topicPartition, offset); + } + } + + public void flush(final InternalProcessorContext context) { + log.debug("Flushing all global stores registered in the state manager"); + for (StateStore store : this.stores.values()) { + try { + log.trace("Flushing global store={}", store.name()); + store.flush(); + } catch (Exception e) { + throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e); + } + } + } + + + @Override + public void close(final Map<TopicPartition, Long> offsets) throws IOException { + try { + if (stores.isEmpty()) { + return; + } + final StringBuilder closeFailed = new StringBuilder(); + for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { + log.debug("Closing global storage engine {}", entry.getKey()); + try { + entry.getValue().close(); + } catch (Exception e) { + log.error("Failed to close global state store {}", entry.getKey(), e); + closeFailed.append("Failed to close global state store:") + .append(entry.getKey()) + .append(". Reason: ") + .append(e.getMessage()) + .append("\n"); + } + } + stores.clear(); + if (closeFailed.length() > 0) { + throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed); + } + writeCheckpoints(offsets); + } finally { + stateDirectory.unlockGlobalState(); + } + } + + private void writeCheckpoints(final Map<TopicPartition, Long> offsets) { + if (!offsets.isEmpty()) { + checkpointableOffsets.putAll(offsets); + try { + checkpoint.write(checkpointableOffsets); + } catch (IOException e) { + log.warn("failed to write offsets checkpoint for global stores", e); + } + } + } + + @Override + public Map<TopicPartition, Long> checkpointedOffsets() { + return Collections.unmodifiableMap(checkpointableOffsets); + } + + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java new file mode 100644 index 0000000..9723f3c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Updates the state for all Global State Stores. + */ +public class GlobalStateUpdateTask implements GlobalStateMaintainer { + + private class SourceNodeAndDeserializer { + private final SourceNode sourceNode; + private final RecordDeserializer deserializer; + + SourceNodeAndDeserializer(final SourceNode sourceNode, + final RecordDeserializer deserializer) { + this.sourceNode = sourceNode; + this.deserializer = deserializer; + } + } + + private final ProcessorTopology topology; + private final InternalProcessorContext processorContext; + private final Map<TopicPartition, Long> offsets = new HashMap<>(); + private final Map<String, SourceNodeAndDeserializer> deserializers = new HashMap<>(); + private final GlobalStateManager stateMgr; + + + public GlobalStateUpdateTask(final ProcessorTopology topology, + final InternalProcessorContext processorContext, + final GlobalStateManager stateMgr) { + + this.topology = topology; + this.stateMgr = stateMgr; + this.processorContext = processorContext; + } + + @SuppressWarnings("unchecked") + public Map<TopicPartition, Long> initialize() { + final Set<String> storeNames = stateMgr.initialize(processorContext); + final Map<String, String> storeNameToTopic = topology.sourceStoreToSourceTopic(); + for (final String storeName : storeNames) { + final String sourceTopic = storeNameToTopic.get(storeName); + final SourceNode source = topology.source(sourceTopic); + deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source))); + } + initTopology(); + processorContext.initialized(); + return stateMgr.checkpointedOffsets(); + } + + + @SuppressWarnings("unchecked") + @Override + public void update(final ConsumerRecord<byte[], byte[]> record) { + final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic()); + final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record); + final ProcessorRecordContext recordContext = + new ProcessorRecordContext(deserialized.timestamp(), + deserialized.offset(), + deserialized.partition(), + deserialized.topic()); + processorContext.setRecordContext(recordContext); + processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode); + sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value()); + offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1); + } + + public void flushState() { + stateMgr.flush(processorContext); + } + + public void close() throws IOException { + stateMgr.close(offsets); + } + + private void initTopology() { + for (ProcessorNode node : this.topology.processors()) { + processorContext.setCurrentNode(node); + try { + node.init(this.processorContext); + } finally { + processorContext.setCurrentNode(null); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java new file mode 100644 index 0000000..b4e15f2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * This is the thread responsible for keeping all Global State Stores updated. + * It delegates most of the responsibility to the internal class StateConsumer + */ +public class GlobalStreamThread extends Thread { + + private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class); + private final StreamsConfig config; + private final Consumer<byte[], byte[]> consumer; + private final StateDirectory stateDirectory; + private final Time time; + private final ThreadCache cache; + private final StreamsMetrics streamsMetrics; + private final ProcessorTopology topology; + private volatile boolean running = false; + private volatile StreamsException startupException; + + public GlobalStreamThread(final ProcessorTopology topology, + final StreamsConfig config, + final Consumer<byte[], byte[]> globalConsumer, + final StateDirectory stateDirectory, + final Metrics metrics, + final Time time, + final String clientId + ) { + super("GlobalStreamThread"); + this.topology = topology; + this.config = config; + this.consumer = globalConsumer; + this.stateDirectory = stateDirectory; + this.time = time; + long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / + (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1)); + final String threadClientId = clientId + "-" + getName(); + this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId)); + this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); + } + + static class StateConsumer { + private final Consumer<byte[], byte[]> consumer; + private final GlobalStateMaintainer stateMaintainer; + private final Time time; + private final long pollMs; + private final long flushInterval; + + private long lastFlush; + + StateConsumer(final Consumer<byte[], byte[]> consumer, + final GlobalStateMaintainer stateMaintainer, + final Time time, + final long pollMs, + final long flushInterval) { + this.consumer = consumer; + this.stateMaintainer = stateMaintainer; + this.time = time; + this.pollMs = pollMs; + this.flushInterval = flushInterval; + } + + void initialize() { + final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize(); + consumer.assign(partitionOffsets.keySet()); + for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) { + consumer.seek(entry.getKey(), entry.getValue()); + } + lastFlush = time.milliseconds(); + } + + void pollAndUpdate() { + final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs); + for (ConsumerRecord<byte[], byte[]> record : received) { + stateMaintainer.update(record); + } + final long now = time.milliseconds(); + if (flushInterval >= 0 && now >= lastFlush + flushInterval) { + stateMaintainer.flushState(); + lastFlush = now; + } + } + + public void close() throws IOException { + // just log an error if the consumer throws an exception during close + // so we can always attempt to close the state stores. + try { + consumer.close(); + } catch (Exception e) { + log.error("Failed to cleanly close GlobalStreamThread consumer", e); + } + + stateMaintainer.close(); + + } + } + + @Override + public void run() { + final StateConsumer stateConsumer = initialize(); + if (stateConsumer == null) { + return; + } + + try { + while (running) { + stateConsumer.pollAndUpdate(); + } + log.debug("Shutting down GlobalStreamThread at user request"); + } finally { + try { + stateConsumer.close(); + } catch (IOException e) { + log.error("Failed to cleanly shutdown GlobalStreamThread", e); + } + } + } + + private StateConsumer initialize() { + try { + final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory); + final StateConsumer stateConsumer + = new StateConsumer(consumer, + new GlobalStateUpdateTask(topology, + new GlobalProcessorContextImpl( + config, + stateMgr, + streamsMetrics, + cache), + stateMgr), + time, + config.getLong(StreamsConfig.POLL_MS_CONFIG), + config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + stateConsumer.initialize(); + running = true; + return stateConsumer; + } catch (StreamsException e) { + startupException = e; + } catch (Exception e) { + startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e); + } + return null; + } + + @Override + public synchronized void start() { + super.start(); + while (!running) { + Utils.sleep(1); + if (startupException != null) { + throw startupException; + } + } + } + + + public void close() { + running = false; + } + + public boolean stillRunning() { + return running; + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 016964b..c78ed09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -43,8 +43,14 @@ public interface InternalProcessorContext extends ProcessorContext { void setCurrentNode(ProcessorNode currentNode); ProcessorNode currentNode(); + /** * Get the thread-global cache */ ThreadCache getCache(); + + /** + * Mark this contex as being initialized + */ + void initialized(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 6496b88..0224cdc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -114,7 +114,7 @@ public class PartitionGroup { RecordQueue recordQueue = partitionQueues.get(partition); int oldSize = recordQueue.size(); - int newSize = recordQueue.addRawRecords(rawRecords, timestampExtractor); + int newSize = recordQueue.addRawRecords(rawRecords); // add this record queue to be considered for processing in the future if it was empty before if (oldSize == 0 && newSize > 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 31bb7c6..f761f16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -17,73 +17,34 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.TopologyBuilderException; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.io.File; import java.util.List; -import java.util.Map; -public class ProcessorContextImpl implements InternalProcessorContext, RecordCollector.Supplier { +public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - public static final String NONEXIST_TOPIC = "__null_topic__"; - - private final TaskId id; private final StreamTask task; - private final StreamsMetrics metrics; private final RecordCollector collector; - private final ProcessorStateManager stateMgr; - - private final StreamsConfig config; - private final Serde<?> keySerde; - private final Serde<?> valSerde; - private final ThreadCache cache; - private boolean initialized; - private RecordContext recordContext; - private ProcessorNode currentNode; - public ProcessorContextImpl(TaskId id, - StreamTask task, - StreamsConfig config, - RecordCollector collector, - ProcessorStateManager stateMgr, - StreamsMetrics metrics, - final ThreadCache cache) { - this.id = id; + ProcessorContextImpl(final TaskId id, + final StreamTask task, + final StreamsConfig config, + final RecordCollector collector, + final ProcessorStateManager stateMgr, + final StreamsMetrics metrics, + final ThreadCache cache) { + super(id, task.applicationId(), config, metrics, stateMgr, cache); this.task = task; - this.metrics = metrics; this.collector = collector; - this.stateMgr = stateMgr; - - this.config = config; - this.keySerde = config.keySerde(); - this.valSerde = config.valueSerde(); - this.cache = cache; - this.initialized = false; - } - - public void initialized() { - this.initialized = true; } public ProcessorStateManager getStateMgr() { - return stateMgr; - } - - @Override - public TaskId taskId() { - return id; - } - - @Override - public String applicationId() { - return task.applicationId(); + return (ProcessorStateManager) stateManager; } @Override @@ -91,142 +52,66 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol return this.collector; } - @Override - public Serde<?> keySerde() { - return this.keySerde; - } - - @Override - public Serde<?> valueSerde() { - return this.valSerde; - } - - @Override - public File stateDir() { - return stateMgr.baseDir(); - } - - @Override - public StreamsMetrics metrics() { - return metrics; - } - - /** - * @throws IllegalStateException if this method is called before {@link #initialized()} - */ - @Override - public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { - if (initialized) - throw new IllegalStateException("Can only create state stores during initialization."); - - stateMgr.register(store, loggingEnabled, stateRestoreCallback); - } - /** * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node */ @Override - public StateStore getStateStore(String name) { - if (currentNode == null) + public StateStore getStateStore(final String name) { + if (currentNode() == null) { throw new TopologyBuilderException("Accessing from an unknown node"); - - if (!currentNode.stateStores.contains(name)) { - throw new TopologyBuilderException("Processor " + currentNode.name() + " has no access to StateStore " + name); } - return stateMgr.getStore(name); - } - - @Override - public ThreadCache getCache() { - return cache; - } - - /** - * @throws IllegalStateException if the task's record is null - */ - @Override - public String topic() { - if (recordContext == null) - throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); - - String topic = recordContext.topic(); - - if (topic.equals(NONEXIST_TOPIC)) - return null; - else - return topic; - } - - /** - * @throws IllegalStateException if partition is null - */ - @Override - public int partition() { - if (recordContext == null) - throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); - - return recordContext.partition(); - } - - /** - * @throws IllegalStateException if offset is null - */ - @Override - public long offset() { - if (recordContext == null) - throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed"); - - return recordContext.offset(); - } + final StateStore global = stateManager.getGlobalStore(name); + if (global != null) { + return global; + } - /** - * @throws IllegalStateException if timestamp is null - */ - @Override - public long timestamp() { - if (recordContext == null) - throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed"); + if (!currentNode().stateStores.contains(name)) { + throw new TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name); + } - return recordContext.timestamp(); + return stateManager.getStore(name); } + @SuppressWarnings("unchecked") @Override - public <K, V> void forward(K key, V value) { - ProcessorNode previousNode = currentNode; + public <K, V> void forward(final K key, final V value) { + final ProcessorNode previousNode = currentNode(); try { - for (ProcessorNode child : (List<ProcessorNode>) currentNode.children()) { - currentNode = child; + for (ProcessorNode child : (List<ProcessorNode>) currentNode().children()) { + setCurrentNode(child); child.process(key, value); } } finally { - currentNode = previousNode; + setCurrentNode(previousNode); } } + @SuppressWarnings("unchecked") @Override - public <K, V> void forward(K key, V value, int childIndex) { - ProcessorNode previousNode = currentNode; - final ProcessorNode child = (ProcessorNode<K, V>) currentNode.children().get(childIndex); - currentNode = child; + public <K, V> void forward(final K key, final V value, final int childIndex) { + final ProcessorNode previousNode = currentNode(); + final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex); + setCurrentNode(child); try { child.process(key, value); } finally { - currentNode = previousNode; + setCurrentNode(previousNode); } } + @SuppressWarnings("unchecked") @Override - public <K, V> void forward(K key, V value, String childName) { - for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) { + public <K, V> void forward(final K key, final V value, final String childName) { + for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) { if (child.name().equals(childName)) { - ProcessorNode previousNode = currentNode; - currentNode = child; + ProcessorNode previousNode = currentNode(); + setCurrentNode(child); try { child.process(key, value); return; } finally { - currentNode = previousNode; + setCurrentNode(previousNode); } } } @@ -238,37 +123,8 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol } @Override - public void schedule(long interval) { + public void schedule(final long interval) { task.schedule(interval); } - @Override - public Map<String, Object> appConfigs() { - return config.originals(); - } - - @Override - public Map<String, Object> appConfigsWithPrefix(String prefix) { - return config.originalsWithPrefix(prefix); - } - - @Override - public void setRecordContext(final RecordContext recordContext) { - this.recordContext = recordContext; - } - - @Override - public RecordContext recordContext() { - return this.recordContext; - } - - @Override - public void setCurrentNode(final ProcessorNode currentNode) { - this.currentNode = currentNode; - } - - @Override - public ProcessorNode currentNode() { - return currentNode; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index f165ebf..b66e3df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -37,7 +37,7 @@ public class ProcessorNode<K, V> { private final String name; private final Processor<K, V> processor; - protected NodeMetrics nodeMetrics; + NodeMetrics nodeMetrics; private Time time; private K key; http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index c81df6c..dca8192 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -46,7 +46,7 @@ import java.util.Set; import static java.util.Collections.singleton; -public class ProcessorStateManager { +public class ProcessorStateManager implements StateManager { private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class); @@ -59,6 +59,7 @@ public class ProcessorStateManager { private final Map<String, TopicPartition> partitionForTopic; private final File baseDir; private final Map<String, StateStore> stores; + private final Map<String, StateStore> globalStores; private final Set<String> loggingEnabled; private final Consumer<byte[], byte[]> restoreConsumer; private final Map<TopicPartition, Long> restoredOffsets; @@ -89,6 +90,7 @@ public class ProcessorStateManager { this.partitionForTopic.put(source.topic(), source); } this.stores = new LinkedHashMap<>(); + this.globalStores = new HashMap<>(); this.loggingEnabled = new HashSet<>(); this.restoreConsumer = restoreConsumer; this.restoredOffsets = new HashMap<>(); @@ -321,6 +323,7 @@ public class ProcessorStateManager { return stores.get(name); } + @Override public void flush(final InternalProcessorContext context) { if (!this.stores.isEmpty()) { log.debug("{} Flushing all stores registered in the state manager", logPrefix); @@ -342,6 +345,7 @@ public class ProcessorStateManager { /** * @throws IOException if any error happens when closing the state stores */ + @Override public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException { try { // attempting to close the stores, just in case they @@ -398,4 +402,14 @@ public class ProcessorStateManager { return partition == null ? defaultPartition : partition.partition(); } + + void registerGlobalStateStores(final List<StateStore> stateStores) { + for (StateStore stateStore : stateStores) { + globalStores.put(stateStore.name(), stateStore); + } + } + + public StateStore getGlobalStore(final String name) { + return globalStores.get(name); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 9ccc252..1eff351 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -31,19 +31,22 @@ public class ProcessorTopology { private final List<StateStore> stateStores; private final Map<String, String> sourceStoreToSourceTopic; private final Map<StateStore, ProcessorNode> storeToProcessorNodeMap; - - public ProcessorTopology(List<ProcessorNode> processorNodes, - Map<String, SourceNode> sourceByTopics, - Map<String, SinkNode> sinkByTopics, - List<StateStore> stateStores, - Map<String, String> sourceStoreToSourceTopic, - Map<StateStore, ProcessorNode> storeToProcessorNodeMap) { + private final List<StateStore> globalStateStores; + + public ProcessorTopology(final List<ProcessorNode> processorNodes, + final Map<String, SourceNode> sourceByTopics, + final Map<String, SinkNode> sinkByTopics, + final List<StateStore> stateStores, + final Map<String, String> sourceStoreToSourceTopic, + final Map<StateStore, ProcessorNode> storeToProcessorNodeMap, + final List<StateStore> globalStateStores) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); this.stateStores = Collections.unmodifiableList(stateStores); this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap); + this.globalStateStores = Collections.unmodifiableList(globalStateStores); } public Set<String> sourceTopics() { @@ -86,6 +89,11 @@ public class ProcessorTopology { return storeToProcessorNodeMap; } + + public List<StateStore> globalStateStores() { + return globalStateStores; + } + private String childrenToString(String indent, List<ProcessorNode<?, ?>> children) { if (children == null || children.isEmpty()) { return ""; http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java new file mode 100644 index 0000000..1129a71 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +interface RecordDeserializer { + ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a40b9ff..077a4d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -19,15 +19,12 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TimestampExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import static java.lang.String.format; /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the @@ -39,20 +36,26 @@ public class RecordQueue { private static final Logger log = LoggerFactory.getLogger(RecordQueue.class); private final SourceNode source; + private final TimestampExtractor timestampExtractor; private final TopicPartition partition; private final ArrayDeque<StampedRecord> fifoQueue; private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker; + private final RecordDeserializer recordDeserializer; private long partitionTime = TimestampTracker.NOT_KNOWN; - public RecordQueue(TopicPartition partition, SourceNode source) { + RecordQueue(final TopicPartition partition, + final SourceNode source, + final TimestampExtractor timestampExtractor) { this.partition = partition; this.source = source; - + this.timestampExtractor = timestampExtractor; this.fifoQueue = new ArrayDeque<>(); this.timeTracker = new MinTimestampTracker<>(); + this.recordDeserializer = new SourceNodeRecordDeserializer(source); } + /** * Returns the corresponding source node in the topology * @@ -75,36 +78,13 @@ public class RecordQueue { * Add a batch of {@link ConsumerRecord} into the queue * * @param rawRecords the raw records - * @param timestampExtractor TimestampExtractor * @return the size of this queue */ - public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, TimestampExtractor timestampExtractor) { + public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { - // deserialize the raw record, extract the timestamp and put into the queue - final Object key; - try { - key = source.deserializeKey(rawRecord.topic(), rawRecord.key()); - } catch (Exception e) { - throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d", - rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); - } - - final Object value; - try { - value = source.deserializeValue(rawRecord.topic(), rawRecord.value()); - } catch (Exception e) { - throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d", - rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); - } - - ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), - rawRecord.timestamp(), TimestampType.CREATE_TIME, - rawRecord.checksum(), - rawRecord.serializedKeySize(), - rawRecord.serializedValueSize(), key, value); + ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord); long timestamp = timestampExtractor.extract(record, timeTracker.get()); - - log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); + log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); // drop message if TS is invalid, i.e., negative if (timestamp < 0) { @@ -112,7 +92,6 @@ public class RecordQueue { } StampedRecord stampedRecord = new StampedRecord(record, timestamp); - fifoQueue.addLast(stampedRecord); timeTracker.addElement(stampedRecord); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java new file mode 100644 index 0000000..d70af25 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.streams.errors.StreamsException; + +import static java.lang.String.format; + +class SourceNodeRecordDeserializer implements RecordDeserializer { + private final SourceNode sourceNode; + + SourceNodeRecordDeserializer(final SourceNode sourceNode) { + this.sourceNode = sourceNode; + } + + @Override + public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) { + final Object key; + try { + key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.key()); + } catch (Exception e) { + throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d", + rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); + } + + final Object value; + try { + value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.value()); + } catch (Exception e) { + throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d", + rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); + } + + return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), + rawRecord.timestamp(), TimestampType.CREATE_TIME, + rawRecord.checksum(), + rawRecord.serializedKeySize(), + rawRecord.serializedValueSize(), key, value); + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 86bdc19..954c710 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -19,20 +19,17 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.io.File; import java.util.Collections; import java.util.Map; -public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier { +class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() { @Override @@ -61,51 +58,17 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle } }; - private final TaskId id; - private final String applicationId; - private final StreamsMetrics metrics; - private final ProcessorStateManager stateMgr; - - private final StreamsConfig config; - private final Serde<?> keySerde; - private final Serde<?> valSerde; - private final ThreadCache zeroSizedCache; - - private boolean initialized; - - public StandbyContextImpl(TaskId id, - String applicationId, - StreamsConfig config, - ProcessorStateManager stateMgr, - StreamsMetrics metrics) { - this.id = id; - this.applicationId = applicationId; - this.metrics = metrics; - this.stateMgr = stateMgr; - - this.config = config; - this.keySerde = config.keySerde(); - this.valSerde = config.valueSerde(); - this.zeroSizedCache = new ThreadCache("zeroCache", 0, this.metrics); - this.initialized = false; + public StandbyContextImpl(final TaskId id, + final String applicationId, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetrics metrics) { + super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics)); } - public void initialized() { - this.initialized = true; - } - public ProcessorStateManager getStateMgr() { - return stateMgr; - } - - @Override - public TaskId taskId() { - return id; - } - - @Override - public String applicationId() { - return applicationId; + StateManager getStateMgr() { + return stateManager; } @Override @@ -113,37 +76,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle return NO_OP_COLLECTOR; } - @Override - public Serde<?> keySerde() { - return this.keySerde; - } - - @Override - public Serde<?> valueSerde() { - return this.valSerde; - } - - @Override - public File stateDir() { - return stateMgr.baseDir(); - } - - @Override - public StreamsMetrics metrics() { - return metrics; - } - - /** - * @throws IllegalStateException - */ - @Override - public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { - if (initialized) - throw new IllegalStateException("Can only create state stores during initialization."); - - stateMgr.register(store, loggingEnabled, stateRestoreCallback); - } - /** * @throws UnsupportedOperationException */ @@ -152,11 +84,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks."); } - @Override - public ThreadCache getCache() { - return zeroSizedCache; - } - /** * @throws UnsupportedOperationException */ @@ -229,15 +156,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); } - @Override - public Map<String, Object> appConfigs() { - return config.originals(); - } - - @Override - public Map<String, Object> appConfigsWithPrefix(String prefix) { - return config.originalsWithPrefix(prefix); - } @Override public RecordContext recordContext() { http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a48ec59..d264b26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -47,6 +47,9 @@ public class StateDirectory { private final HashMap<TaskId, FileChannel> channels = new HashMap<>(); private final HashMap<TaskId, FileLock> locks = new HashMap<>(); + private FileChannel globalStateChannel; + private FileLock globalStateLock; + public StateDirectory(final String applicationId, final String stateDirConfig) { final File baseDir = new File(stateDirConfig); if (!baseDir.exists() && !baseDir.mkdirs()) { @@ -75,6 +78,15 @@ public class StateDirectory { return taskDir; } + public File globalStateDir() { + final File dir = new File(stateDir, "global"); + if (!dir.exists() && !dir.mkdir()) { + throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", + dir.getPath())); + } + return dir; + } + /** * Get the lock for the {@link TaskId}s directory if it is available * @param taskId @@ -100,6 +112,49 @@ public class StateDirectory { return false; } + final FileLock lock = tryLock(retry, channel); + if (lock != null) { + locks.put(taskId, lock); + } + return lock != null; + } + + public boolean lockGlobalState(final int retry) throws IOException { + if (globalStateLock != null) { + return true; + } + + final File lockFile = new File(globalStateDir(), LOCK_FILE_NAME); + final FileChannel channel; + try { + channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + } catch (NoSuchFileException e) { + // FileChannel.open(..) could throw NoSuchFileException when there is another thread + // concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock + // file, in this case we will return immediately indicating locking failed. + return false; + } + final FileLock fileLock = tryLock(retry, channel); + if (fileLock == null) { + channel.close(); + return false; + } + globalStateChannel = channel; + globalStateLock = fileLock; + return true; + } + + public void unlockGlobalState() throws IOException { + if (globalStateLock == null) { + return; + } + globalStateLock.release(); + globalStateChannel.close(); + globalStateLock = null; + globalStateChannel = null; + } + + private FileLock tryLock(int retry, final FileChannel channel) throws IOException { FileLock lock = tryAcquireLock(channel); while (lock == null && retry > 0) { try { @@ -110,12 +165,11 @@ public class StateDirectory { retry--; lock = tryAcquireLock(channel); } - if (lock != null) { - locks.put(taskId, lock); - } - return lock != null; + return lock; } + + /** * Unlock the state directory for the given {@link TaskId} * @param taskId @@ -196,4 +250,7 @@ public class StateDirectory { return null; } } + + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java new file mode 100644 index 0000000..7343c85 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +interface StateManager { + File baseDir(); + + void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback); + + void flush(InternalProcessorContext context); + + void close(Map<TopicPartition, Long> offsets) throws IOException; + + StateStore getGlobalStore(final String name); + + StateStore getStore(final String name); + + Map<TopicPartition, Long> checkpointedOffsets(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 07ae761..b714221 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -114,15 +114,17 @@ public class StreamTask extends AbstractTask implements Punctuator { // to corresponding source nodes in the processor topology partitionQueues = new HashMap<>(); + TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + for (TopicPartition partition : partitions) { SourceNode source = topology.source(partition.topic()); - RecordQueue queue = createRecordQueue(partition, source); + RecordQueue queue = createRecordQueue(partition, source, timestampExtractor); partitionQueues.put(partition, queue); } this.logPrefix = String.format("task [%s]", id); - TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor); // initialize the consumed offset cache @@ -137,8 +139,9 @@ public class StreamTask extends AbstractTask implements Punctuator { // initialize the state stores log.info("{} Initializing state stores", logPrefix); initializeStateStores(); + stateMgr.registerGlobalStateStores(topology.globalStateStores()); initTopology(); - ((ProcessorContextImpl) this.processorContext).initialized(); + this.processorContext.initialized(); } /** @@ -379,8 +382,9 @@ public class StreamTask extends AbstractTask implements Punctuator { return recordCollector.offsets(); } - private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) { - return new RecordQueue(partition, source); + @SuppressWarnings("unchecked") + private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source, final TimestampExtractor timestampExtractor) { + return new RecordQueue(partition, source, timestampExtractor); } private ProcessorRecordContext createRecordContext(final StampedRecord currRecord) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5641849..38961f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -202,7 +202,6 @@ public class StreamThread extends Thread { private final StreamsMetricsThreadImpl streamsMetrics; final StateDirectory stateDirectory; private String originalReset; - private StreamPartitionAssignor partitionAssignor = null; private boolean cleanRun = false; private long timerStartedMs; @@ -285,7 +284,8 @@ public class StreamThread extends Thread { UUID processId, Metrics metrics, Time time, - StreamsMetadataState streamsMetadataState) { + StreamsMetadataState streamsMetadataState, + final long cacheSizeBytes) { super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); this.applicationId = applicationId; String threadName = getName(); @@ -303,8 +303,6 @@ public class StreamThread extends Thread { if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName); } - long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / - config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)); this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics); @@ -425,6 +423,8 @@ public class StreamThread extends Thread { removeStreamTasks(); removeStandbyTasks(); + // clean up global tasks + log.info("{} Stream thread shutdown complete", logPrefix); setState(State.NOT_RUNNING); streamsMetrics.removeAllSensors(); @@ -1108,7 +1108,7 @@ public class StreamThread extends Thread { } /** - * This class extends {@link #StreamsMetricsImpl(Metrics, String, String, Map)} and + * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and * overrides one of its functions for efficiency */ private class StreamsMetricsThreadImpl extends StreamsMetricsImpl { http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 5494674..a59eb5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -43,12 +43,18 @@ import java.util.Set; * in a KafkaStreams application */ public class StreamsMetadataState { + public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1); private final TopologyBuilder builder; private final List<StreamsMetadata> allMetadata = new ArrayList<>(); + private final Set<String> globalStores; + private final HostInfo thisHost; private Cluster clusterMetadata; + private StreamsMetadata myMetadata; - public StreamsMetadataState(final TopologyBuilder builder) { + public StreamsMetadataState(final TopologyBuilder builder, final HostInfo thisHost) { this.builder = builder; + this.globalStores = builder.globalStateStores().keySet(); + this.thisHost = thisHost; } /** @@ -74,6 +80,10 @@ public class StreamsMetadataState { return Collections.emptyList(); } + if (globalStores.contains(storeName)) { + return allMetadata; + } + final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); if (sourceTopics == null) { return Collections.emptyList(); @@ -114,6 +124,15 @@ public class StreamsMetadataState { return StreamsMetadata.NOT_AVAILABLE; } + if (globalStores.contains(storeName)) { + // global stores are on every node. if we dont' have the host info + // for this host then just pick the first metadata + if (thisHost == UNKNOWN_HOST) { + return allMetadata.get(0); + } + return myMetadata; + } + final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { return null; @@ -155,6 +174,15 @@ public class StreamsMetadataState { return StreamsMetadata.NOT_AVAILABLE; } + if (globalStores.contains(storeName)) { + // global stores are on every node. if we dont' have the host info + // for this host then just pick the first metadata + if (thisHost == UNKNOWN_HOST) { + return allMetadata.get(0); + } + return myMetadata; + } + SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { return null; @@ -198,7 +226,12 @@ public class StreamsMetadataState { storesOnHost.add(storeTopicEntry.getKey()); } } - allMetadata.add(new StreamsMetadata(key, storesOnHost, partitionsForHost)); + storesOnHost.addAll(globalStores); + final StreamsMetadata metadata = new StreamsMetadata(key, storesOnHost, partitionsForHost); + allMetadata.add(metadata); + if (key.equals(thisHost)) { + myMetadata = metadata; + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 6da40b7..fdb03fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -132,6 +132,9 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public synchronized V get(final K key) { validateStoreOpen(); + if (key == null) { + return null; + } final byte[] rawKey = serdes.rawKey(key); return get(rawKey); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java index 61cd950..eb57ace 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java @@ -22,13 +22,13 @@ import org.apache.kafka.streams.state.KeyValueIterator; import java.util.NoSuchElementException; -class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> { +public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> { private final String storeName; private final KeyValueIterator<K, V> underlying; private KeyValue<K, V> next; private volatile boolean open = true; - DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) { + public DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) { this.storeName = storeName; this.underlying = underlying; } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java new file mode 100644 index 0000000..4957c03 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.QueryableStoreType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class GlobalStateStoreProvider implements StateStoreProvider { + private final Map<String, StateStore> globalStateStores; + + public GlobalStateStoreProvider(Map<String, StateStore> globalStateStores) { + this.globalStateStores = globalStateStores; + } + + @SuppressWarnings("unchecked") + @Override + public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { + final StateStore store = globalStateStores.get(storeName); + if (store == null || !queryableStoreType.accepts(store)) { + return Collections.emptyList(); + } + if (!store.isOpen()) { + throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); + } + return (List<T>) Collections.singletonList(store); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 849caa7..6dc08f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -83,6 +83,10 @@ class NamedCache { } synchronized LRUCacheEntry get(final Bytes key) { + if (key == null) { + return null; + } + final LRUNode node = getInternal(key); if (node == null) { return null; http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index ff17e68..7d10055 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -55,10 +55,7 @@ public class OffsetCheckpoint { private final File file; private final Object lock; - /** - * @throws IOException - */ - public OffsetCheckpoint(File file) throws IOException { + public OffsetCheckpoint(File file) { this.file = file; this.lock = new Object(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 64dac1f..419fc28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -27,20 +28,28 @@ import java.util.List; public class QueryableStoreProvider { private final List<StateStoreProvider> storeProviders; + private final GlobalStateStoreProvider globalStoreProvider; - public QueryableStoreProvider(final List<StateStoreProvider> storeProviders) { + public QueryableStoreProvider(final List<StateStoreProvider> storeProviders, + final GlobalStateStoreProvider globalStateStoreProvider) { this.storeProviders = new ArrayList<>(storeProviders); + this.globalStoreProvider = globalStateStoreProvider; } /** * Get a composite object wrapping the instances of the {@link StateStore} with the provided * storeName and {@link QueryableStoreType} - * @param storeName name of the store - * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)} - * @param <T> The expected type of the returned store + * + * @param storeName name of the store + * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)} + * @param <T> The expected type of the returned store * @return A composite object that wraps the store instances. */ public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { + final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); + if (!globalStore.isEmpty()) { + return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); + } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 6f33a63..f8f0e08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -104,6 +104,10 @@ public class ThreadCache { public LRUCacheEntry get(final String namespace, byte[] key) { numGets++; + if (key == null) { + return null; + } + final NamedCache cache = getCache(namespace); if (cache == null) { return null;
