> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, > > line 230 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416302#file1416302line230> > > > > nit: Doc can be improved by explaining why this is more efficient. That > > will be more informative. Or remove it altogther.
The javaDoc for the method explains why it is more efficient. I'll elaborate just a little here. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala, > > line 67 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416303#file1416303line67> > > > > Seems like we need a more precise interface for accessing stream's > > metadata cache. > > An interface that will allow specifying list of ssps and the cacheTTL. > > > > Note to self: Make changes to systemAdmin interface after the next > > release :) Yeah, TTL is uniquely important to the partition count monitor because an infinite metadata cache would prevent it from detecting partition changes. Everywhere else, a longer cache is better. Your note to self is reflected at the top of the file. :-) > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, > > line 281 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line281> > > > > nit: more accurate documentation here. You can move it out of the > > function definition I actually didn't intend to leave this comment in. The older version had no retry, but this version retries in order to start with an infinite cache TTL and then shorten it if there's a failure. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, > > line 284 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line284> > > > > I don't like what we are doing here - setting the ttl to be max value. > > However, I don't see a better way of doing it. :( There's no reason to fetch metadata if everything is working, so initially we want to use the cache no matter what. If there's a problem, we should refresh. To accomplish this, I considered 2 ways: 1. Update the metadata cache to explicitly expire problematic entries. This would be a much more invasive change and potentially less performant. 2. Lower the cache TTL if there is a problem so the cache refreshes itself. Both rely on a large or infinite initial TTL. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, > > line 324 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line324> > > > > I strongly feel the need to make retries as a part of the > > ExponentialSleepStrategy. It will certainly be used in many places. > > Considering the urgency of this fix and the fact that the runLoop is in > > Scale, we can table it for another day. There is a separate, unrelated patch to enforce max retries on the strategy, but even that wouldn't change the logic here because we want to control the TTL and print meaningful log messages based on the remaining number of retries. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala, line > > 192 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416309#file1416309line192> > > > > nit: Do we really need these trace lines? If we do, can you change it > > to something like: "Begin flush to store" and "Flush to store complete" They could be submitted in a separate patch, but they are useful trace messages. The original message only tells us that the flush method was called but doesn't help to trace the calls in the various KV Store wrappers. These new messages can also be used to time the operation. I don't see any value to making the messages more verbose. They currently read like this: 2016-06-14 21:31:19.171 [main] [SerializedKeyValueStore] [TRACE] Flushing store. 2016-06-14 21:31:19.171 [main] [LoggedStore] [TRACE] Flushing store. 2016-06-14 21:31:19.171 [main] [RocksDbKeyValueStore] [TRACE] Flushing store: pending-requests. 2016-06-14 21:31:19.172 [main] [RocksDbKeyValueStore] [TRACE] Flushed store: pending-requests. 2016-06-14 21:31:19.172 [main] [LoggedStore] [TRACE] Flushed store. 2016-06-14 21:31:19.172 [main] [SerializedKeyValueStore] [TRACE] Flushed store. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, > > line 276 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line276> > > > > Documentation is not clear. It says "It does not retry if there is a > > failure".. But you are retrying upto "maxretries". > > > > It will be useful to define what is considered a failure, for example, > > what exceptions are ok for retry and which ones aren't. If there isn't any > > clear distinction between the exceptions, you can mention that as well Typo. I meant to say "It does not retry *indefinitely* if there is a failure. > On June 14, 2016, 8:11 p.m., Navina Ramesh wrote: > > samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java, > > line 30 > > <https://reviews.apache.org/r/48459/diff/3/?file=1416300#file1416300line30> > > > > Why do we need 2 methods for getSystemStreamPartitionCounts? I think > > you can change the method signature since this interface was introduced in > > this current version. Was that the reason you added a new method? > > > > I could be missing something very obvious here :D > > Seems like "getSystemStreamPartitionCounts(Set<String> streamNames)" is > > used only in a Mock implementation I didn't realize the old method was added in this release, so I was keeping it for backward compatibility. Removed it. Thanks for noticing. - Jake ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48459/#review137418 ----------------------------------------------------------- On June 13, 2016, 12:22 a.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48459/ > ----------------------------------------------------------- > > (Updated June 13, 2016, 12:22 a.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina > Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure). > > > Bugs: SAMZA-964 > https://issues.apache.org/jira/browse/SAMZA-964 > > > Repository: samza > > > Description > ------- > > SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for > logged stores > > 1. Cache metadata more aggressively. Only expire metadata if we get Kafka > exceptions. This applies for all cases EXCEPT the partition count monitor, > which uses the TTL from the StreamMetadataCache > 2. Reduce excessive Offset fetching. > 3. Do not allow unbounded exponential backoff for offset checkpointing, just > skip the offset file. Exponential backoff can balloon the commit time and > stall the event loop. So we will only retry up to 3 times for a max delay of > 400ms > 4. Add some trace log messages to help track/time KV Store flushes (the other > culprit for the slowdown) > > > Diffs > ----- > > samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java > daa2212cf1d54e90861657fab86b2e780d7e89e2 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > 0a6661c423a09944aa211223cad205958d3b1fee > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > c7b05203a1958a62af9dec04b215d985c4646dc4 > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala > 18b47ec3393978e403cadd8754f3fa5fd68654e9 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > 110c3a910aa0bae77dfe5eebbf82286b56dc4654 > > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala > c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 23aa58dff6b5e282bb634d3913cacd73003402ea > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 6c292234dcdd54eaca05f3e1a3fc401e205d6066 > > samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala > f0965aec5f3ec2a214dc40c70832c58273623749 > samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala > c28f8db8cb59bd5415e78535877acc1e5bee0f67 > samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala > 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 > > samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala > 8e183efcdec6fd3f921fc2bfe1971c95715930ed > > Diff: https://reviews.apache.org/r/48459/diff/ > > > Testing > ------- > > New unit tests. > > ./gradlew clean build > bin/check-all.sh on my Mac > > Manual testing with 2 test jobs and the big job that had the performance > issue. > > > Thanks, > > Jake Maes > >