----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19384/#review38411 -----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/19384/#comment70559> This isn't required if we just take a StreamMetadataCache object (see comments below). samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/19384/#comment70558> I think I prefer having a StreamMetadataCache object get passed in via the constructor, rather than using statics. It makes things more mockable. Can we make StreamMetadataCache work like a normal object, and just have SamzaContainer create one, and pass it into all the TaskInstances? samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/19384/#comment70556> For process, window, and commit, can we just take a PartitionCoordinator here, and have SamzaContainer call TaskCoordinators.coordinatorForPartition to pass in the appropriate PartitionCoordinator? samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/19384/#comment70566> If we take my suggestion above about converting from TaskCoordinators to PartitionCoordinator, we lose access to isShutdownAgreed. We could have SamzaContainer pass a shutdown: Boolean parameter instead. The goal I'm trying to achieve is to keep variables in TaskInstance at the per-TaskInstance level. In the past, introducing shared variables between TaskInstances (like the ones we have in the constructor) has lead to really hard to reason about code. When we did the 0.7.0 refactor, the TaskInstance was setup to be much more careful about this. Also, switching isCommitRequested to being per-TaskInstance instead of global (in TaskCoordinators) is actually a good change I think. If a StreamTask.process method says coordinator.commit right now, it's not obvious that EVERY partition will commit. If the StreamTask.process method has coordinator.commit in it, and you have 10 TaskInstances in the container, and you process 10 messages, you commit 100 times, not 10 times (10 TaskInstance commits for each message). This has led to performance issues in the past for containers that call commit frequently. samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala <https://reviews.apache.org/r/19384/#comment70562> I think it might be better to make this just a normal class, and create an instance in SamzaContainer that gets passed everywhere. It will make everything more mockable, plus the TTL, clock, and SystemAdmins can be taken as parameters. samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala <https://reviews.apache.org/r/19384/#comment70563> Do we need to lock here? I get needing to lock on write so we don't accidentally lose a CacheEntry, but on read, it seems like locking might not be required since we're using an immutable map. samza-core/src/main/scala/org/apache/samza/task/TaskCoordinators.scala <https://reviews.apache.org/r/19384/#comment70560> TaskInstanceCoordinator might be a better name for this. SAMZA-71 is going to break the tie between a single partition and a TaskInstance, so PartitionCoordinator will be a misnomer in the future. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/19384/#comment70569> Can you do one dot per line here? metadata .getSystemStreamPartitionMetadata .keys .map samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/19384/#comment70570> I think you can just do new SystemStreamPartition(systemStream, _) samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala <https://reviews.apache.org/r/19384/#comment70568> Do we need this change since we kept the shutdown method now? - Chris Riccomini On March 22, 2014, 8:18 p.m., Martin Kleppmann wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19384/ > ----------------------------------------------------------- > > (Updated March 22, 2014, 8:18 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA-179 Add caching of system metadata lookups > > > SAMZA-179 Rename ReadableCoordinator to TaskCoordinators; improve shutdown > method signature > > > SAMZA-179 Allow a task to detect when it has caught up, and shut down > gracefully. > > > Diffs > ----- > > build.gradle fc596267e38c53cfc44f1cf52f8d6acedc848da8 > gradle/dependency-versions.gradle 612670dded2c2d290f1bc36ece1f3d53ffa4e971 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java > 611507ed340d9a3fb7b8cd5e0e0ce37b5561da32 > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 192b226c9e6105cf666d484ac33bb0f854de7688 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > c101b59f3e476dcc2e3b7870d53d0d36002f2434 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > aaf631ec7acd710ab8a5b288f696233223569b60 > samza-core/src/main/scala/org/apache/samza/task/TaskCoordinators.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 5b429dfece38d877175fa0495db6450e01d82689 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > a2d5820e9eeb7590d208cf7fb7025c589f451bca > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 27b4ca5d7995536aa66f63b7329caddf41865bb4 > > samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 > samza-core/src/test/scala/org/apache/samza/task/TestTaskCoordinators.scala > PRE-CREATION > > samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala > 3dc263011b955cfccac83e71384b865f8fc2b722 > > Diff: https://reviews.apache.org/r/19384/diff/ > > > Testing > ------- > > > Thanks, > > Martin Kleppmann > >
