Repository: samza Updated Branches: refs/heads/master 52ff04197 -> 965a645d8
SAMZA-1065: Change the commit order to support at least once processing when using local state store for deduping. Author: Prateek Maheshwari <[email protected]> Reviewers: Yi Pan <[email protected]>, Jagadish <jagadish1989@gmail,com> Closes #35 from prateekm/commit-order Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/965a645d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/965a645d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/965a645d Branch: refs/heads/master Commit: 965a645d84f7118cb68f2994233ed61f67dbc690 Parents: 52ff041 Author: Prateek Maheshwari <[email protected]> Authored: Fri Dec 23 15:08:55 2016 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Dec 23 15:08:55 2016 -0800 ---------------------------------------------------------------------- .../org/apache/samza/container/TaskInstance.scala | 14 +++++++------- .../org/apache/samza/storage/kv/LoggedStore.scala | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index b068856..26a8f5f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -191,19 +191,19 @@ class TaskInstance[T]( } def commit { - trace("Flushing state stores for taskName: %s" format taskName) - metrics.commits.inc - if (storageManager != null) { - storageManager.flush - } - trace("Flushing producers for taskName: %s" format taskName) collector.flush - trace("Committing offset manager for taskName: %s" format taskName) + trace("Flushing state stores for taskName: %s" format taskName) + + if (storageManager != null) { + storageManager.flush + } + + trace("Checkpointing offsets for taskName: %s" format taskName) offsetManager.checkpoint(taskName) } http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index dc5cbcd..33ff41b 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -61,8 +61,8 @@ class LoggedStore[K, V]( */ def put(key: K, value: V) { metrics.puts.inc - store.put(key, value) collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, value)) + store.put(key, value) } /** @@ -70,12 +70,12 @@ class LoggedStore[K, V]( */ def putAll(entries: java.util.List[Entry[K, V]]) { metrics.puts.inc(entries.size) - store.putAll(entries) val iter = entries.iterator while (iter.hasNext) { val curr = iter.next collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, curr.getKey, curr.getValue)) } + store.putAll(entries) } /** @@ -83,8 +83,8 @@ class LoggedStore[K, V]( */ def delete(key: K) { metrics.deletes.inc - store.delete(key) collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null)) + store.delete(key) } /** @@ -92,11 +92,11 @@ class LoggedStore[K, V]( */ def deleteAll(keys: java.util.List[K]) = { metrics.deletes.inc(keys.size) - store.deleteAll(keys) val keysIterator = keys.iterator while (keysIterator.hasNext) { collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, keysIterator.next, null)) } + store.deleteAll(keys) } def flush {
