Repository: kafka Updated Branches: refs/heads/trunk 1a539c74c -> 4ee68b43c
HOTFIX: fix streams issues * RocksDBStore.putInternal should bypass logging. * StoreChangeLogger should not call context.recordCollector() when nothing to log * This is for standby tasks. In standby task, recordCollector() throws an exception. There should be nothing to log anyway. * fixed ConcurrentModificationException in StreamThread guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #877 from ymatsuda/hotfix2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ee68b43 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ee68b43 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ee68b43 Branch: refs/heads/trunk Commit: 4ee68b43c180d1f68648c0fb388a66b1ed0023e5 Parents: 1a539c7 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Feb 9 04:46:11 2016 +0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 9 04:46:11 2016 +0800 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 21 ++++++++++++++++---- .../streams/state/internals/RocksDBStore.java | 12 +++++------ .../state/internals/StoreChangeLogger.java | 3 +++ 3 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8948fc8..6a8eabc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -104,7 +104,7 @@ public class StreamThread extends Thread { private long lastCommit; private long recordsProcessed; - private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; + private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; private boolean processStandbyRecords = false; static File makeStateDir(String jobId, String baseDirName) { @@ -355,18 +355,22 @@ public class StreamThread extends Thread { if (!standbyTasks.isEmpty()) { if (processStandbyRecords) { if (!standbyRecords.isEmpty()) { + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>(); + for (TopicPartition partition : standbyRecords.keySet()) { - StandbyTask task = standbyTasksByPartition.get(partition); - List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition); + List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.get(partition); if (remaining != null) { + StandbyTask task = standbyTasksByPartition.get(partition); remaining = task.update(partition, remaining); if (remaining != null) { - standbyRecords.put(partition, remaining); + remainingStandbyRecords.put(partition, remaining); } else { restoreConsumer.resume(partition); } } } + + standbyRecords = remainingStandbyRecords; } processStandbyRecords = false; } @@ -376,6 +380,12 @@ public class StreamThread extends Thread { if (!records.isEmpty()) { for (TopicPartition partition : records.partitions()) { StandbyTask task = standbyTasksByPartition.get(partition); + + if (task == null) { + log.error("missing standby task for partition {}", partition); + throw new StreamsException("missing standby task for partition " + partition); + } + List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); if (remaining != null) { restoreConsumer.pause(partition); @@ -642,6 +652,9 @@ public class StreamThread extends Thread { } // collect checked pointed offsets to position the restore consumer // this include all partitions from which we restore states + for (TopicPartition partition : task.checkpointedOffsets().keySet()) { + standbyTasksByPartition.put(partition, task); + } checkpointedOffsets.putAll(task.checkpointedOffsets()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 556e7cd..5c57854 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -201,7 +201,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public boolean persistent() { - return false; + return true; } @Override @@ -241,6 +241,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { byte[] rawKey = serdes.rawKey(key); byte[] rawValue = serdes.rawValue(value); putInternal(rawKey, rawValue); + + if (loggingEnabled) { + changeLogger.add(rawKey); + changeLogger.maybeLogChange(this.getter); + } } } @@ -260,11 +265,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { " and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e); } } - - if (loggingEnabled) { - changeLogger.add(rawKey); - changeLogger.maybeLogChange(this.getter); - } } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index 3bbd522..aac4d85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -86,6 +86,9 @@ public class StoreChangeLogger<K, V> { } public void logChange(ValueGetter<K, V> getter) { + if (this.removed.isEmpty() && this.dirty.isEmpty()) + return; + RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); if (collector != null) { Serializer<K> keySerializer = serialization.keySerializer();
