> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 639 > > <https://reviews.apache.org/r/21014/diff/1/?file=573728#file573728line639> > > > > Moving lastCommitMs out of TaskInstance is a very subtle change in > > behavior that's not what we want, I think. > > > > Before, lastCommit pas per-TaskInstance. Calling coordinator.commit > > from a StreamTask would result in the lastCommitMs clock getting reset. > > This is no longer the case- the clock is only reset when the lastCommitMs > > expires now, even if a StreamTask calls coordinator.commit on every process > > call. > > > > If we want to maintain the same behavior as before, we still need > > per-TaskInstance commit clocks (even if we take them out of the > > TaskInstance class). > > > > This problem doesn't exist with window() since there's no way to > > trigger a window except through lastWindowMs timeout. No coordinator.window > > method exists.
Hmm, not sure about this. I would argue that it's nicer if the clock-triggered commits and the explicitly-triggered commits are independent of each other, i.e. if explicitly requesting a commit does not reset the timer. The code is simpler if we don't have to keep track of a separate timer for each TaskInstance, and the performance will probably be better (saves iterating over all task instances every time we go round the run loop). For clock-triggered commits, it also seems more natural to me if all TaskInstances in the same container commit at the same time (so we capture a snapshot of the entire container's state at one point in time). So I agree that this is a change of semantics, but I would argue that it's a change for the better. But maybe there's a compelling argument in favour of the old semantics that I'm missing? > On May 5, 2014, 6:06 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java, line 23 > > <https://reviews.apache.org/r/21014/diff/1/?file=573727#file573727line23> > > > > What do you think about supporting the same style of commit that > > shutdown has (i.e. commit all/commit partition). Since you rolled in some > > of the SAMZA-23 work, it seems like we should discuss this part of the > > commit behavior. > > > > If we went this route, we might be able to generalize the > > ShutdownMethod enum a bit to work for both commit and shutdown. > > > > I'd have to see how it would look in practice, but commit has a very > > similar usage pattern: sometimes you want to commit all tasks in the > > container (e.g. before a long pause), but usually you just want to commit > > your partition. Yeah, I think this would be worth a try. It would be nice to have symmetry in the API here. In this case, would you mind if I remove the parameter-less commit() and shutdown() methods on TaskCoordinator? It would be a compatibility-breaking change, but would remove clutter from the API. > On May 5, 2014, 6:06 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 475 > > <https://reviews.apache.org/r/21014/diff/1/?file=573728#file573728line475> > > > > I like the idea of moving this stuff out of TaskInstance, but I'd like > > to keep as much logic out of SamzaContainer as possible. In 0.6.0, we had a > > ton of logic in the TaskRunner (equivalent to SamzaContainer), and it > > turned into a total mess. > > > > Can we move all of the code you've added to SamzaContainer into a > > separate class (CoordinatorCoordinator? :P kidding...), and just have the > > container use that class in the appropriate spots? The new class could: > > > > 1. handle ReadableCoordinator creation before each > > process/window/commit call. > > 2. implement window/commit logic > > 3. contain checkCoordinator > > 4. keep all state for windowing/committing (these variables). > > > > The goal would be to remove all mentions of coordinator from the > > SamzaContainer, and put them all in this new class. I like this idea -- it will also make it a lot easier to test (SamzaContainer has so many constructor parameters that it's quite painful to test). I'm extracting it into org.apache.samza.container.RunLoop, and will update this RB when done. > On May 5, 2014, 6:06 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 168 > > <https://reviews.apache.org/r/21014/diff/1/?file=573729#file573729line168> > > > > Can still keep this block around since the task might not be windowable. Is a metric like metrics.windowsSkipped at all useful? Whether a task is windowable or not isn't going to change at runtime, so the metric only counts how many times a no-op method has been called. Seems of little value. > On May 5, 2014, 6:06 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 214 > > <https://reviews.apache.org/r/21014/diff/1/?file=573729#file573729line214> > > > > Can remove this from TaskInstanceMetrics. Ack. - Martin ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21014/#review42168 ----------------------------------------------------------- On May 2, 2014, 5:48 p.m., Martin Kleppmann wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/21014/ > ----------------------------------------------------------- > > (Updated May 2, 2014, 5:48 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA-253: Consensus shutdown API > > > Diffs > ----- > > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 192b226c9e6105cf666d484ac33bb0f854de7688 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 24364f4ad967eec9474225604b9cc4f830cc3b2e > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > aaf631ec7acd710ab8a5b288f696233223569b60 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 27b4ca5d7995536aa66f63b7329caddf41865bb4 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 > > Diff: https://reviews.apache.org/r/21014/diff/ > > > Testing > ------- > > > Thanks, > > Martin Kleppmann > >
