Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Jonathan Haddad
My understand is that all CDC really is now is a stable commit log reader.
For a given mutation on an RF=3 system, you'll end up with 3 readers that
all *could* do some action.  For now let's just say "put it it in a Kafka
topic" because that lets us do anything we want after that.

I suppose the most straightforward way would be to have a daemon that runs
on each Cassandra node who's job it is to see a mutation come through then
apply a lock via LWT in order to make a best effort at only-once semantics
(but we know this is best effort and we're really going to get
at-least-once)  The node that acquired the lock (let's call it A) would
have to be responsible for marking the mutation as complete.  The other
nodes (B & C) that failed at the LWT insert would be the backup nodes to
ensure if A node died pushing the mutation along they would be able to do
it.  I'm not sure how each mutation is identified at the moment, is there a
mutation ID that gets sent to each replica?  Without that you'd need to
compute some hash I support and use that as the LWT key but of course now
you have to deal with potential collissions, maybe you'd use
{mutation_timestamp}.{mutation_hash} and call it a day.

The problem here is now we've added a ton of Paxos rounds to every
mutation, increasing the load on our Cassandra cluster by a significant
amount.  The alternative, which might be more performant, is to keep the
mutation state in memory, or in a local keyspace?  This has additional
issues, like what do we do when topology changes?

A third option is you assume your going to get constant
at-least-once-probably-a-whole-bunch and have all of your Kafka consumers
deal with the fact that they'll be getting RF copies of a mutation and do
the dedupe on their end.  Maybe you'd have a single topic for raw,
unduplicated data that keeps track of what it's seen and forwards on only a
single copy?  The tradeoff here is now you've got 3 copies of each mutation
in the first topic plus another that gets forwarded on plus you may want a
durable log of what's been seen since, you know, servers do restart
sometimes.

I'll be completely honest, I don't have a good answer to any of this.
Doing it at the Cassandra level in either of the first 2 scenarios above
feels like a mess and a lot of work, I'd probably spent the most time
working on option 3, aka using Kafka consumer to dedupe and produce a new,
clean topic.  I pick this not because it shines as the most elegant or
efficient option but because I feel like I'd be able to write it and have
the best chance of getting it reasonably close to correct.

Let me make sure I'm being clear on this - I would approach the development
of CDC the exact same way it's been built.  A CL reader is a logical first
step towards building a useful system and is flexible to build all sorts of
designs on top of it.  I just want to point out that it's not as simple as
"point it at HDFS and let it do it's magic".  There will be a significant
investment of time, probably a lot more than just putting Kafka in front of
Cassandra & HDFS.

Hopefully someone smarter than me can figure out how to make this work
well.

Jon

On Tue, Aug 9, 2016 at 11:27 AM Ryan Svihla  wrote:

> Jon,
>
> You know I've not actually spent the hour to read the ticket so I was just
> guessing it didn't handle dedup...all the same semantics apply
> though..you'd have to do a read before write and then allow some window of
> failure mode. Maybe if you were LWT everything but that sounds really
> slow...I'd be curious of your thoughts on how to do that well..maybe I'm
> missing something.
>
> Regards,
> Ryan Svihla
>
> On Aug 9, 2016, 1:13 PM -0500, Jonathan Haddad , wrote:
>
> I'm having a hard time seeing how anyone would be able to work with CDC in
> it's currently implementation of not doing any dedupe.  Unless you really
> want to write all your own logic for that including failure handling + a
> distributed state machine I wouldn't count on it as a solution.
>
> On Tue, Aug 9, 2016 at 10:49 AM Ryan Svihla  wrote:
>
>> You can follow the monster of a ticket
>> *https://issues.apache.org/jira/browse/CASSANDRA-8844*
>>  and see if it
>> looks like the tradeoffs there are headed in the right direction for you.
>>
>> even CDC I think would have the logically same issue of not deduping for
>> you as triggers and dual write due to replication factor and consistently
>> level issues. Otherwise you'd be stuck doing an all replica comparison when
>> a late event came in and when a node was down what would you do then? what
>> if one replica got it as well and then came on line much later? Even if you
>> were using a single source of truth style database, you'll find failover
>> has a way of losing late events anyway (due to async replication) not to
>> mention once you go multiple dc it's all a matter of what DC you're in.
>>
>> Anyway for the 

Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Ryan Svihla
Jon,

You know I've not actually spent the hour to read the ticket so I was just 
guessing it didn't handle dedup...all the same semantics apply though..you'd 
have to do a read before write and then allow some window of failure mode. 
Maybe if you were LWT everything but that sounds really slow...I'd be curious 
of your thoughts on how to do that well..maybe I'm missing something.

Regards,
Ryan Svihla

On Aug 9, 2016, 1:13 PM -0500, Jonathan Haddad , wrote:
> I'm having a hard time seeing how anyone would be able to work with CDC in 
> it's currently implementation of not doing any dedupe. Unless you really want 
> to write all your own logic for that including failure handling + a 
> distributed state machine I wouldn't count on it as a solution.
> On Tue, Aug 9, 2016 at 10:49 AM Ryan Svihla  (mailto:r...@foundev.pro)> wrote:
> > You can follow the monster of a ticket 
> > https://issues.apache.org/jira/browse/CASSANDRA-8844 and see if it looks 
> > like the tradeoffs there are headed in the right direction for you.
> >
> > even CDC I think would have the logically same issue of not deduping for 
> > you as triggers and dual write due to replication factor and consistently 
> > level issues. Otherwise you'd be stuck doing an all replica comparison when 
> > a late event came in and when a node was down what would you do then? what 
> > if one replica got it as well and then came on line much later? Even if you 
> > were using a single source of truth style database, you'll find failover 
> > has a way of losing late events anyway (due to async replication) not to 
> > mention once you go multiple dc it's all a matter of what DC you're in.
> >
> > Anyway for the cold storage I think a trailing amount that is just greater 
> > than your old events would do it. IE if you choose to only accept 30 days 
> > out then cold storage for 32 days. At some point there is no free lunch as 
> > you point out when replicating between two data sources. ie CDC, triggers 
> > really anything that marks a "new event" will have the same problem and 
> > you'll have to choose an acceptable level of lateness or check for lateness 
> > indefinitely.
> >
> > Alternatively you can just accept duplication and handle it cold storage 
> > read side (like event sourcing pattern, this would be ideal if the lateness 
> > is uncommon) or clean it up over time in cold storage as it's detected 
> > (similar to an event sourcing pattern, but snapshotting data down to a 
> > single record when you encounter it on a read).
> >
> > Best of luck, this is a corner case that requires hard tradeoffs in all 
> > technology I've encountered.
> >
> > Regards,
> > Ryan Svihla
> >
> >
> >
> > On Aug 9, 2016, 12:21 PM -0500, Ben Vogan  > (mailto:b...@shopkick.com)>, wrote:
> > > Thanks Ryan. I was hoping there was a change data capture framework. We 
> > > have late arriving events, some of which can be very late. We would have 
> > > to batch collect data for a large time period every so often to go back 
> > > and collect those or accept that we are going to lose a small percentage 
> > > of events. Neither of which is ideal.
> > >
> > > On Tue, Aug 9, 2016 at 10:30 AM, Ryan Svihla  > > (mailto:r...@foundev.pro)> wrote:
> > > > The typical pattern I've seen in the field is kafka + consumers for 
> > > > each destination (variant of dual write I know), this of course would 
> > > > not work for your goal of relying on C* for dedup. Triggers would also 
> > > > suffer the same problem unfortunately so you're really left with a 
> > > > batch job (most likely Spark) to move data from C* into HDFS on a given 
> > > > interval. If this is really a cold storage use case that can work quite 
> > > > well especially assuming you've modeled your data as a time series or 
> > > > with some sort of time based bucketing so you can quickly get full 
> > > > partitions data out of C* in a deterministic fashion and not have to 
> > > > scan your entire data set.
> > > >
> > > > I've also for similar needs have seen Spark streaming + querying 
> > > > cassandra for duplication checks to dedup then output to another source 
> > > > (form of dual write but with dedup), this was really silly and slow. I 
> > > > only bring it up to save you the trouble in case you end up in the same 
> > > > path chasing for something more 'real time'.
> > > >
> > > > Regards,
> > > > Ryan Svihla
> > > >
> > > >
> > > > On Aug 9, 2016, 11:09 AM -0500, Ben Vogan  > > > (mailto:b...@shopkick.com)>, wrote:
> > > > > Hi all,
> > > > >
> > > > > We are investigating using Cassandra in our data platform. We would 
> > > > > like data to go into Cassandra first and to eventually be replicated 
> > > > > into our data lake in HDFS for long term cold storage. Does anyone 
> > > > > know of a good way of doing this? We would rather not have parallel 
> > > > > writes to HDFS and Cassandra 

Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Jonathan Haddad
I'm having a hard time seeing how anyone would be able to work with CDC in
it's currently implementation of not doing any dedupe.  Unless you really
want to write all your own logic for that including failure handling + a
distributed state machine I wouldn't count on it as a solution.

On Tue, Aug 9, 2016 at 10:49 AM Ryan Svihla  wrote:

> You can follow the monster of a ticket
> *https://issues.apache.org/jira/browse/CASSANDRA-8844*
>  and see if it
> looks like the tradeoffs there are headed in the right direction for you.
>
> even CDC I think would have the logically same issue of not deduping for
> you as triggers and dual write due to replication factor and consistently
> level issues. Otherwise you'd be stuck doing an all replica comparison when
> a late event came in and when a node was down what would you do then? what
> if one replica got it as well and then came on line much later? Even if you
> were using a single source of truth style database, you'll find failover
> has a way of losing late events anyway (due to async replication) not to
> mention once you go multiple dc it's all a matter of what DC you're in.
>
> Anyway for the cold storage I think a trailing amount that is just greater
> than your old events would do it. IE if you choose to only accept 30 days
> out then cold storage for 32 days. At some point there is no free lunch as
> you point out when replicating between two data sources. ie CDC, triggers
> really anything that marks a "new event" will have the same problem and
> you'll have to choose an acceptable level of lateness or check for lateness
> indefinitely.
>
> Alternatively you can just accept duplication and handle it cold storage
> read side (like event sourcing pattern, this would be ideal if the lateness
> is uncommon) or clean it up over time in cold storage as it's detected
> (similar to an event sourcing pattern, but snapshotting data down to a
> single record when you encounter it on a read).
>
> Best of luck, this is a corner case that requires hard tradeoffs in all
> technology I've encountered.
>
> Regards,
> Ryan Svihla
>
> On Aug 9, 2016, 12:21 PM -0500, Ben Vogan , wrote:
>
> Thanks Ryan.  I was hoping there was a change data capture framework.  We
> have late arriving events, some of which can be very late.  We would have
> to batch collect data for a large time period every so often to go back and
> collect those or accept that we are going to lose a small percentage of
> events.  Neither of which is ideal.
>
> On Tue, Aug 9, 2016 at 10:30 AM, Ryan Svihla  wrote:
>
>> The typical pattern I've seen in the field is kafka + consumers for each
>> destination (variant of dual write I know), this of course would not work
>> for your goal of relying on C* for dedup. Triggers would also suffer the
>> same problem unfortunately so you're really left with a batch job (most
>> likely Spark) to move data from C* into HDFS on a given interval. If this
>> is really a cold storage use case that can work quite well especially
>> assuming you've modeled your data as a time series or with some sort of
>> time based bucketing so you can quickly get full partitions data out of C*
>> in a deterministic fashion and not have to scan your entire data set.
>>
>> I've also for similar needs have seen Spark streaming + querying
>> cassandra for duplication checks to dedup then output to another source
>> (form of dual write but with dedup), this was really silly and slow. I only
>> bring it up to save you the trouble in case you end up in the same path
>> chasing for something more 'real time'.
>>
>> Regards,
>> Ryan Svihla
>>
>> On Aug 9, 2016, 11:09 AM -0500, Ben Vogan , wrote:
>>
>> Hi all,
>>
>> We are investigating using Cassandra in our data platform.  We would like
>> data to go into Cassandra first and to eventually be replicated into our
>> data lake in HDFS for long term cold storage.  Does anyone know of a good
>> way of doing this?  We would rather not have parallel writes to HDFS and
>> Cassandra because we were hoping that we could use Cassandra primary keys
>> to de-duplicate events.
>>
>> Thanks,
>> --
>> 
>> *BENJAMIN VOGAN* | Data Platform Team Lead
>> shopkick 
>>  
>>  
>> 
>>
>> The indispensable app that rewards you for shopping.
>>
>>
>
>
> --
> 
> *BENJAMIN VOGAN* | Data Platform Team Lead
> shopkick 
>  
>  
> 

Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Ryan Svihla
You can follow the monster of a ticket 
https://issues.apache.org/jira/browse/CASSANDRA-8844 and see if it looks like 
the tradeoffs there are headed in the right direction for you.

even CDC I think would have the logically same issue of not deduping for you as 
triggers and dual write due to replication factor and consistently level 
issues. Otherwise you'd be stuck doing an all replica comparison when a late 
event came in and when a node was down what would you do then? what if one 
replica got it as well and then came on line much later? Even if you were using 
a single source of truth style database, you'll find failover has a way of 
losing late events anyway (due to async replication) not to mention once you go 
multiple dc it's all a matter of what DC you're in.

Anyway for the cold storage I think a trailing amount that is just greater than 
your old events would do it. IE if you choose to only accept 30 days out then 
cold storage for 32 days. At some point there is no free lunch as you point out 
when replicating between two data sources. ie CDC, triggers really anything 
that marks a "new event" will have the same problem and you'll have to choose 
an acceptable level of lateness or check for lateness indefinitely.

Alternatively you can just accept duplication and handle it cold storage read 
side (like event sourcing pattern, this would be ideal if the lateness is 
uncommon) or clean it up over time in cold storage as it's detected (similar to 
an event sourcing pattern, but snapshotting data down to a single record when 
you encounter it on a read).

Best of luck, this is a corner case that requires hard tradeoffs in all 
technology I've encountered.

Regards,
Ryan Svihla

On Aug 9, 2016, 12:21 PM -0500, Ben Vogan , wrote:
> Thanks Ryan. I was hoping there was a change data capture framework. We have 
> late arriving events, some of which can be very late. We would have to batch 
> collect data for a large time period every so often to go back and collect 
> those or accept that we are going to lose a small percentage of events. 
> Neither of which is ideal.
>
> On Tue, Aug 9, 2016 at 10:30 AM, Ryan Svihla  (mailto:r...@foundev.pro)> wrote:
> > The typical pattern I've seen in the field is kafka + consumers for each 
> > destination (variant of dual write I know), this of course would not work 
> > for your goal of relying on C* for dedup. Triggers would also suffer the 
> > same problem unfortunately so you're really left with a batch job (most 
> > likely Spark) to move data from C* into HDFS on a given interval. If this 
> > is really a cold storage use case that can work quite well especially 
> > assuming you've modeled your data as a time series or with some sort of 
> > time based bucketing so you can quickly get full partitions data out of C* 
> > in a deterministic fashion and not have to scan your entire data set.
> >
> > I've also for similar needs have seen Spark streaming + querying cassandra 
> > for duplication checks to dedup then output to another source (form of dual 
> > write but with dedup), this was really silly and slow. I only bring it up 
> > to save you the trouble in case you end up in the same path chasing for 
> > something more 'real time'.
> >
> > Regards,
> > Ryan Svihla
> >
> >
> > On Aug 9, 2016, 11:09 AM -0500, Ben Vogan  > (mailto:b...@shopkick.com)>, wrote:
> > > Hi all,
> > >
> > > We are investigating using Cassandra in our data platform. We would like 
> > > data to go into Cassandra first and to eventually be replicated into our 
> > > data lake in HDFS for long term cold storage. Does anyone know of a good 
> > > way of doing this? We would rather not have parallel writes to HDFS and 
> > > Cassandra because we were hoping that we could use Cassandra primary keys 
> > > to de-duplicate events.
> > >
> > > Thanks, --
> > >
> > > BENJAMIN VOGAN | Data Platform Team Lead
> > > shopkick (http://www.shopkick.com/)
> > > The indispensable app that rewards you for shopping.
>
>
> --
>
> BENJAMIN VOGAN | Data Platform Team Lead
> shopkick (http://www.shopkick.com/)
> The indispensable app that rewards you for shopping.

Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Ben Vogan
Thanks Ryan.  I was hoping there was a change data capture framework.  We
have late arriving events, some of which can be very late.  We would have
to batch collect data for a large time period every so often to go back and
collect those or accept that we are going to lose a small percentage of
events.  Neither of which is ideal.

On Tue, Aug 9, 2016 at 10:30 AM, Ryan Svihla  wrote:

> The typical pattern I've seen in the field is kafka + consumers for each
> destination (variant of dual write I know), this of course would not work
> for your goal of relying on C* for dedup. Triggers would also suffer the
> same problem unfortunately so you're really left with a batch job (most
> likely Spark) to move data from C* into HDFS on a given interval. If this
> is really a cold storage use case that can work quite well especially
> assuming you've modeled your data as a time series or with some sort of
> time based bucketing so you can quickly get full partitions data out of C*
> in a deterministic fashion and not have to scan your entire data set.
>
> I've also for similar needs have seen Spark streaming + querying cassandra
> for duplication checks to dedup then output to another source (form of dual
> write but with dedup), this was really silly and slow. I only bring it up
> to save you the trouble in case you end up in the same path chasing for
> something more 'real time'.
>
> Regards,
> Ryan Svihla
>
> On Aug 9, 2016, 11:09 AM -0500, Ben Vogan , wrote:
>
> Hi all,
>
> We are investigating using Cassandra in our data platform.  We would like
> data to go into Cassandra first and to eventually be replicated into our
> data lake in HDFS for long term cold storage.  Does anyone know of a good
> way of doing this?  We would rather not have parallel writes to HDFS and
> Cassandra because we were hoping that we could use Cassandra primary keys
> to de-duplicate events.
>
> Thanks,
> --
> 
> *BENJAMIN VOGAN* | Data Platform Team Lead
> shopkick 
>  
>  
> 
>
> The indispensable app that rewards you for shopping.
>
>


-- 

*BENJAMIN VOGAN* | Data Platform Team Lead
shopkick 
 
 


The indispensable app that rewards you for shopping.


Re: Replicating Cassandra data to HDFS

2016-08-09 Thread Ryan Svihla
The typical pattern I've seen in the field is kafka + consumers for each 
destination (variant of dual write I know), this of course would not work for 
your goal of relying on C* for dedup. Triggers would also suffer the same 
problem unfortunately so you're really left with a batch job (most likely 
Spark) to move data from C* into HDFS on a given interval. If this is really a 
cold storage use case that can work quite well especially assuming you've 
modeled your data as a time series or with some sort of time based bucketing so 
you can quickly get full partitions data out of C* in a deterministic fashion 
and not have to scan your entire data set.

I've also for similar needs have seen Spark streaming + querying cassandra for 
duplication checks to dedup then output to another source (form of dual write 
but with dedup), this was really silly and slow. I only bring it up to save you 
the trouble in case you end up in the same path chasing for something more 
'real time'.

Regards,
Ryan Svihla

On Aug 9, 2016, 11:09 AM -0500, Ben Vogan , wrote:
> Hi all,
>
> We are investigating using Cassandra in our data platform. We would like data 
> to go into Cassandra first and to eventually be replicated into our data lake 
> in HDFS for long term cold storage. Does anyone know of a good way of doing 
> this? We would rather not have parallel writes to HDFS and Cassandra because 
> we were hoping that we could use Cassandra primary keys to de-duplicate 
> events.
>
> Thanks, --
>
> BENJAMIN VOGAN | Data Platform Team Lead
> shopkick (http://www.shopkick.com/)
> The indispensable app that rewards you for shopping.