Hi Wei,

Thanks for the proposal, it looks good!
I have some clarification questions though, hope you don't mind :)

- About the consistency.
1. I guess when you are talking about consistency you mean versioning and
thus making all tasks use a specific version (most of the time the last
visible one), right? So there is no notion of database consistency but
versioning?
2. I agree with Jagadish in the point of doing a GC process for keeping the
last visible version for all tasks. Because even for the case you mention
of having bounded input streams, compacting/GC previous version to keep
only the latest one, doesn't mean that it is wrong, it only means that our
local store only has the latest one. Let's say that our final version is T,
and we have tasks that are still reading T-5, then we would compact until
T-5, but we have much less data to keep.

- About recovery/failure scenarios
1. What would happen if the key-value store fails? Do we make the tasks
wait until the key-value stores come back? or we mark messages that arrived
while the key-value failed as messages that need to be reprocessed?
2. What if a task gets restarted that reads from such key-value store?
Should it just read from the last available version on the key-value store?
or should it fetch the version where it stopped working and start from
there?

- About querying the key-value stores.
1. If we have multiple key-value stores from multiple containers and they
are not centralized ones. Do you have any plans on how to handle multiple
versions from them? Zookeeper?


Best,

Renato M.


2017-05-20 8:01 GMT+02:00 Wei Song <weison...@gmail.com>:

> Thanks much for your comment, Jahadish!
>
> Please see inline for my response …
>
> -Wei
>
>
> On 5/19/17, 5:47 PM, "Jagadish Venkatraman" <jagadish1...@gmail.com>
> wrote:
>
>     Thanks for writing up this proposal, Wei. This will go a long way in
>     satisfying a number of Samza use-cases. I'm +1 to this idea.
>
>     >> Section on proposed changes: Provide hooks to transform an incoming
>     message to desired types (this is useful to store a subset of the
> incoming
>     message).
>
>     1. I believe you mean store a "projection" of the incoming message?
> Might
>   be clear to re-word it IMHO.
>   [wei] Good point, I’ve called out projection as purposes.
>
>     2. Does Adjunct Data store support change-logging?  If it does not,
>   wondering if it might be worth calling it out.
>   [wei] No, since the store is read-only and every bit of information can
> be recovered
>   the original stream, it doesn’t require change logs. I’ve added section
> “Recovery”
>   to discuss recovery and explicitly call this out.
>
>     >> Section on Consistency
>
>     3. IMO, adding a discussion on what causes a potential inconsistency,
> and
>     how we determine what is a consistent snapshot will probably be useful
> (in
>   bounded, and unbounded datasets).
>   [wei] Yes, it make sense to explain in more details. I’ve expanded the
> section
>   with more details.
>
>     >> Section on Bootstrapping: A bootstrap is forced if the store is not
>     available, or not valid (too old)
>
>     4. How do we determine if a store is invalid / old? One way would be to
>   store the recent offsets somewhere, and compare the offsets upon startup.
>   [wei] This is addressed in the added “Recovery” section. My proposal is
> to provide
>   timestamp based approach in the initial implementation; if we discover
> this being
>   insufficient, we would add offset based solutions. You could find more
> details
>   in this new section.
>
>     >> We may provide a default serde for POJO.
>
>     5. +1 for adding default serdes. Using java serialization is probably
> the
>   simplest (mandating that keys and values contain serializable fields)
>   [wei] A potential issue with is approach is that it requires implementing
>   Serializable, maybe it’s worth looking at other possibilities as well.
>
>     6. This will perhaps be clearer as we get to implementing it.
> Currently,
>     there are 3 storage managers in the proposal - "TaskStorageManager",
>     "ContainerStorageManager" and "AdjunctDataStorageManager" (different
> from
>     AdjunctDataStoreManager) . Not entirely sure we need all 3. Maybe, we
> do.
>   [wei] I’ve added a section “implementation notes” to explain their
> responsibilities,
>   please review.
>
>     >> After bootstrap, for change capture data, we keep updating its AD
> store
>     when new updates arrives
>
>     7. If used with a streaming source like Kafka, wouldn't the data
> storage
>     size grow unbounded in size? Do we need to handle garbage collection of
>     really stale data? What do you think about adding a section on how GC
>   works? (both for bounded, and unbounded sources)
>   [wei] there is a possibility, but remote. The intent for an adjunct data
>   store is for querying, especially change data capture scenarios. The
> underlying
>   dataset is bounded, it’s just the number updates could be unbounded.
>   Adding trimming would break the consistency without an
>   external backup store. On the other side, user should also be mindful to
>   use the adjunct store in the intended way.
>
>     >> Configuration: stores.adstore.manager.factory
>
>     8. If the user implements their own AdjunctDataStoreManagerFactory,
> What is
>     the lifecycle of  the returned `AdjunctDataStoreManager`?
>     AFAICS, there is no easy way for an implementor to obtain an instance
> of a
>     K-V store inside AdjunctDataStoreManagerFactory interface?
>     Should the API take in a Map<String, KVStore> stores instead of a
>   Map<String, StorageEngine> ?
>   [wei] the reason StorageEngine is used here is samza-core doesn’t have a
> dependency
>   on samza-kv, which hosts KeyValueStorageEngine. This is actually the
> reason to
>   make this customizable, so that user can implement this interface to
> support
>   non K/V stores. I know this is probably not the main use case for now.
> For K/V based
>   stores, I don’t see a lot of need for customization.
>
>
> Thanks for your review, Jagadish!
>
>
>     Best,
>     Jagadish
>
>
>
>     On Fri, May 19, 2017 at 3:59 PM, Wei Song <weison...@gmail.com> wrote:
>
>     > Thanks Xinyu for your feedback. With regard to your question, when a
> new
>     > version of a file becomes available, we would already be in the
> normal
>     > processing mode, either the connector or external system would need
> to
>     > inject an indication to signal the end of the current version and
> continue
>     > send the new version. The adjunct data store would recognize the
>     > indication, and build a new version in background. While the new
> version is
>     > being built, we continue process incoming event from main stream
> using the
>     > existing version. Once the new version is built, we switch to it and
> old
>     > version can be discarded if desired. It should be seamless from
>     > processing's perspective.
>     >
>     > On Fri, May 19, 2017 at 2:19 PM, xinyu liu <xinyuliu...@gmail.com>
> wrote:
>     >
>     > > Hi, Wei,
>     > >
>     > > +1 on the proposed design. This is going to reduce a lot of
> heavy-lifting
>     > > work that's needed done by user code today to bootstrap a data
> stream
>     > into
>     > > local store. The configs look quite straightforward and easy to
> set up.
>     > > Overall the design looks great to me.
>     > >
>     > > I have one question: in the proposal you mentioned "When Samza is
> running
>     > > in 24x7 mode, the stream for a bounded dataset may deliver multiple
>     > > versions.". So after the bootstrap of the initial version is done,
> what
>     > > will happen when the new version comes? Right now by default
> Bootstrap
>     > > stream is set up to be priority INT_MAX, meaning it will preempt
> other
>     > > streams to be processed if the bootstrap is going on. Are we
> expecting
>     > > pauses when the new version of adjunct data coming? Please let me
> know
>     > what
>     > > will be the plan to handle this scenario.
>     > >
>     > > Thanks,
>     > > Xinyu
>     > >
>     > > On Tue, May 16, 2017 at 2:15 PM, Navina Ramesh (Apache) <
>     > nav...@apache.org
>     > > >
>     > > wrote:
>     > >
>     > > > Thanks for trying 3 times, Wei. Sorry about the trouble. Not
> sure where
>     > > the
>     > > > problem lies. Looking forward to review your design.
>     > > >
>     > > > Navina
>     > > >
>     > > > On Tue, May 16, 2017 at 8:56 AM, Wei Song <ws...@linkedin.com>
> wrote:
>     > > >
>     > > > > Hey everyone,
>     > > > >
>     > > > > I created a proposal for SAMZA-1278
>     > > > > <https://issues.apache.org/jira/browse/SAMZA-1278>, Adjunct
> Data
>     > Store
>     > > > > for Unbounded DataSets, which introduces an automatic
> mechanism to
>     > > store
>     > > > > adjunct data for stream tasks.
>     > > > >
>     > > > > https://cwiki.apache.org/confluence/display/SAMZA/Adjunct+Da
>     > > > > ta+Store+for+Unbounded+DataSets
>     > > > >
>     > > > > Please review and comments are welcome!
>     > > > >
>     > > > > For those who are not actively following the master branch,
> you may
>     > > have
>     > > > > more questions than others. Feel free to ask them here.
>     > > > >
>     > > > > P.S. this is the 3rd try, sent this last week, but apparently
> no one
>     > at
>     > > > > Linkedin has received, including samza-dev here just to be
> sure.
>     > > > >
>     > > > > --
>     > > > > Thanks,
>     > > > > -Wei
>     > > > >
>     > > >
>     > >
>     >
>
>
>
>     --
>     Jagadish V,
>     Graduate Student,
>     Department of Computer Science,
>     Stanford University
>
>
>
>

Reply via email to