> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 639
> > <https://reviews.apache.org/r/21014/diff/1/?file=573728#file573728line639>
> >
> >     Moving lastCommitMs out of TaskInstance is a very subtle change in 
> > behavior that's not what we want, I think.
> >     
> >     Before, lastCommit pas per-TaskInstance. Calling coordinator.commit 
> > from a StreamTask would result in the lastCommitMs clock getting reset. 
> > This is no longer the case- the clock is only reset when the lastCommitMs 
> > expires now, even if a StreamTask calls coordinator.commit on every process 
> > call.
> >     
> >     If we want to maintain the same behavior as before, we still need 
> > per-TaskInstance commit clocks (even if we take them out of the 
> > TaskInstance class).
> >     
> >     This problem doesn't exist with window() since there's no way to 
> > trigger a window except through lastWindowMs timeout. No coordinator.window 
> > method exists.

Hmm, not sure about this. I would argue that it's nicer if the clock-triggered 
commits and the explicitly-triggered commits are independent of each other, 
i.e. if explicitly requesting a commit does not reset the timer.

The code is simpler if we don't have to keep track of a separate timer for each 
TaskInstance, and the performance will probably be better (saves iterating over 
all task instances every time we go round the run loop). For clock-triggered 
commits, it also seems more natural to me if all TaskInstances in the same 
container commit at the same time (so we capture a snapshot of the entire 
container's state at one point in time).

So I agree that this is a change of semantics, but I would argue that it's a 
change for the better. But maybe there's a compelling argument in favour of the 
old semantics that I'm missing?


> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java, line 23
> > <https://reviews.apache.org/r/21014/diff/1/?file=573727#file573727line23>
> >
> >     What do you think about supporting the same style of commit that 
> > shutdown has (i.e. commit all/commit partition). Since you rolled in some 
> > of the SAMZA-23 work, it seems like we should discuss this part of the 
> > commit behavior.
> >     
> >     If we went this route, we might be able to generalize the 
> > ShutdownMethod enum a bit to work for both commit and shutdown.
> >     
> >     I'd have to see how it would look in practice, but commit has a very 
> > similar usage pattern: sometimes you want to commit all tasks in the 
> > container (e.g. before a long pause), but usually you just want to commit 
> > your partition.

Yeah, I think this would be worth a try. It would be nice to have symmetry in 
the API here.

In this case, would you mind if I remove the parameter-less commit() and 
shutdown() methods on TaskCoordinator? It would be a compatibility-breaking 
change, but would remove clutter from the API.


> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 475
> > <https://reviews.apache.org/r/21014/diff/1/?file=573728#file573728line475>
> >
> >     I like the idea of moving this stuff out of TaskInstance, but I'd like 
> > to keep as much logic out of SamzaContainer as possible. In 0.6.0, we had a 
> > ton of logic in the TaskRunner (equivalent to SamzaContainer), and it 
> > turned into a total mess.
> >     
> >     Can we move all of the code you've added to SamzaContainer into a 
> > separate class (CoordinatorCoordinator? :P kidding...), and just have the 
> > container use that class in the appropriate spots? The new class could:
> >     
> >     1. handle ReadableCoordinator creation before each 
> > process/window/commit call.
> >     2. implement window/commit logic
> >     3. contain checkCoordinator
> >     4. keep all state for windowing/committing (these variables).
> >     
> >     The goal would be to remove all mentions of coordinator from the 
> > SamzaContainer, and put them all in this new class.

I like this idea -- it will also make it a lot easier to test (SamzaContainer 
has so many constructor parameters that it's quite painful to test). I'm 
extracting it into org.apache.samza.container.RunLoop, and will update this RB 
when done.


> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 168
> > <https://reviews.apache.org/r/21014/diff/1/?file=573729#file573729line168>
> >
> >     Can still keep this block around since the task might not be windowable.

Is a metric like metrics.windowsSkipped at all useful? Whether a task is 
windowable or not isn't going to change at runtime, so the metric only counts 
how many times a no-op method has been called. Seems of little value.


> On May 5, 2014, 6:06 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 214
> > <https://reviews.apache.org/r/21014/diff/1/?file=573729#file573729line214>
> >
> >     Can remove this from TaskInstanceMetrics.

Ack.


- Martin


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21014/#review42168
-----------------------------------------------------------


On May 2, 2014, 5:48 p.m., Martin Kleppmann wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21014/
> -----------------------------------------------------------
> 
> (Updated May 2, 2014, 5:48 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-253: Consensus shutdown API
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 192b226c9e6105cf666d484ac33bb0f854de7688 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 24364f4ad967eec9474225604b9cc4f830cc3b2e 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> aaf631ec7acd710ab8a5b288f696233223569b60 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 27b4ca5d7995536aa66f63b7329caddf41865bb4 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 
> 
> Diff: https://reviews.apache.org/r/21014/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Martin Kleppmann
> 
>

Reply via email to