[
https://issues.apache.org/jira/browse/FALCON-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13920883#comment-13920883
]
Srikanth Sundarrajan commented on FALCON-267:
---------------------------------------------
[[email protected]] and I had a chat conversation on this. Reproducing the
transcripts with [[email protected]]'s permission to enable others to chime in
as well.
{code}
Jean-Baptiste Onofré: second question, about CDC and late-cut-off element
Srikanth Sundarrajan: yes
Jean-Baptiste Onofré: regarding in the sources, the late-cut-off element
creates a job in Oozie to check if a feed has changed
Jean-Baptiste Onofré: right ?
Srikanth Sundarrajan: no it doesn't
Srikanth Sundarrajan: let me walk you through the dtails
Jean-Baptiste Onofré: yes please
Srikanth Sundarrajan: Some notes
here:http://falcon.incubator.apache.org/docs/FalconArchitecture.html#Handling_late_input_data
Srikanth Sundarrajan: but to cover more details
Srikanth Sundarrajan: Falcon in the parent workflow (which is actually what is
scheduled in Oozie, user's workflow is setup as a child sub flow within that
parent workflow), there are two steps
Srikanth Sundarrajan: 1. record size (first step)
(process execution, retention or replication)
Srikanth Sundarrajan: record-size essentially is enabled only if the process or
feed replication is enabled for late handling
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: if it is enabled, record-size step, walks through all the
input data dirs and records their sizes in a separate file under the falcon
working dir
Srikanth Sundarrajan: in the post processing, parent workflow sends a message
to active-mq saying that the workflow is successful
Jean-Baptiste Onofré: ok, so you have the feed size at time T in the falcon
working dir
Srikanth Sundarrajan: yes
Srikanth Sundarrajan: now falcon server listens to the active-mq message
Jean-Baptiste Onofré: so, we can compare at T+1 to see if it changed or not,
right ?
Srikanth Sundarrajan: a little more involved there....
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: and when falcon server gets the message, if the process
or feed replication for which we got the message is enabled for late handling,
it puts the message for re-run into a delayed queue
Srikanth Sundarrajan: the delay is based on the late-rerun policy (periodic,
exp-backoff, final etc)
Srikanth Sundarrajan: now when the delay expires falcon server checks the
current sizes of all the feed input for that process / replication
Srikanth Sundarrajan: if anything has changed, falcon simply triggers a
workflow re-run through oozie to re-run it
Srikanth Sundarrajan: this keeps happening till the late-cut off of all the
dependent feeds expire when we no that we no longer need to consider changes
Srikanth Sundarrajan: makes sense ?
Jean-Baptiste Onofré: catcha
Jean-Baptiste Onofré: the use case is mostly replication, right ?
Srikanth Sundarrajan: also for process which is generating new data
Jean-Baptiste Onofré: yes, I see
Srikanth Sundarrajan: essentially it covers all cases
Srikanth Sundarrajan: there are no special handling for replication or any
specific recipe for that matter
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: solution is very generic
Jean-Baptiste Onofré: as you said in Jira comment, maybe I can leverage
late-cut off for CDC
Jean-Baptiste Onofré: my purpose for CDC is:
Srikanth Sundarrajan: absolutely
Srikanth Sundarrajan: from your description, it seemed like there is a lot of
overlap
Jean-Baptiste Onofré: yes
Jean-Baptiste Onofré: the "missing" part would be the "gap" file in the falcon
working directory
Jean-Baptiste Onofré: let say
Jean-Baptiste Onofré: 1/ an user submit a feed with late-cut off
Jean-Baptiste Onofré: thanks to late-cut off, using the "size file" in falcon
working directory, we can know if a feed has changed or not
Jean-Baptiste Onofré: 2/ if the feed changes, we send an message to ActiveMQ
topic containing a location in the falcon working dir containing the gap file
(what has changed on the feed)
Jean-Baptiste Onofré: can I change the schedule of the sub-workflow in Oozie ?
Srikanth Sundarrajan: what is a gap file ? can you elaborate. sorry if it s
standard terminology.
Jean-Baptiste Onofré: no worries, it's my "wording" (it's probably not good)
Jean-Baptiste Onofré: the gap file is a file with a diff between two states of
a feed
Jean-Baptiste Onofré: for instance, feed "in" has size 10000
Jean-Baptiste Onofré: the late-cut off runs (let say every 10mn) and now the
feed "in" has size 15000
Jean-Baptiste Onofré: the gap file would be the diff between feed "in" when its
size was 10000 and now that the size is 15000
Jean-Baptiste Onofré: maybe my use case doesn't make sense
Jean-Baptiste Onofré: I just evaluate and think loud ;)
Srikanth Sundarrajan: since the input has changed, the existing mechanism
already identifies this change
Jean-Baptiste Onofré: oh ok
Srikanth Sundarrajan: It is not very clear to me, what is missing
Jean-Baptiste Onofré: so, let me try to summarize
Jean-Baptiste Onofré: 1/ if I create a feed "in" with late-cut off element,
when I submit this feed, I will have a job in Oozie which periodically check
the size of the feed, right ?
Srikanth Sundarrajan: does this feed have a replication or are you considering
it purely as input for a process which generates new data based on this data
"in"
Srikanth Sundarrajan: or is it both ?
Jean-Baptiste Onofré: no replication for now
Jean-Baptiste Onofré: let say I have a MapReduce job that "populate" the
location of the feed
Srikanth Sundarrajan: so we have only replication use case in consideration
Srikanth Sundarrajan: in that case, when the feed is "SCHEDULED", at submission
nothing happens mind you
Jean-Baptiste Onofré: it's what I saw ;)
Srikanth Sundarrajan: upon schedule, it create a oozie coord for replication
Srikanth Sundarrajan: that has 3 steps in the workflow
Jean-Baptiste Onofré: but replication is to replicate/sync between two
clusters, right ?
Srikanth Sundarrajan: 1st step is record size
sync/distributed copy/replication
post processing
Srikanth Sundarrajan: yes replication is sync
Srikanth Sundarrajan: record size records saying "in" for time instance "T0" is
say S0, then the replication happens and the notification comes to falcon server
Jean-Baptiste Onofré: catcha
Jean-Baptiste Onofré: for CDC, I'm considering only one cluster (no
replication), focused on the feed itself
Srikanth Sundarrajan: then we are talking about no replication where this
feed's change has to impact the processes consuming this feed.correct ?
Jean-Baptiste Onofré: correct
Srikanth Sundarrajan: in which case, feed has no role to play
Srikanth Sundarrajan: upon feed submission or schedule, size is not recorded
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: size is actually recorded during that process execution
Srikanth Sundarrajan: so the responsibility of what has changed with the
process and not for the feed
Jean-Baptiste Onofré: hmmm, I see
Jean-Baptiste Onofré: maybe we can provide a "CDC" process so
Jean-Baptiste Onofré: taking the feed
Srikanth Sundarrajan: so you are interested to simply audit changes to the feed
independent of it usage by any process or replication
Jean-Baptiste Onofré: right
Srikanth Sundarrajan: do I understand your ask correctly ?
Jean-Baptiste Onofré: it's pure Change Data Capture
Jean-Baptiste Onofré: the purpose is to "monitore" HDFS locations, detect
change on the locations, and send a message to a topic
Srikanth Sundarrajan: when the change is detected, what action need to be
performed ?
Jean-Baptiste Onofré: the subscribers on the topic will receive a notification
that a location changed
Jean-Baptiste Onofré: the action is the responsability of the subscribers
Srikanth Sundarrajan: so simple notification to the subscribers
Jean-Baptiste Onofré: they can trigger a process, do that they want
Jean-Baptiste Onofré: yes
Srikanth Sundarrajan: now the question is. you mentioned that there is a M/R
job that generates data for this feed. correct ?
Srikanth Sundarrajan: is that a falcon process ?
Jean-Baptiste Onofré: for instance, my demo would be to use a Camel route to
get the notificiation, and send e-mail, etc
Jean-Baptiste Onofré: the M/R job is a pure job, not necessary a falcon process
(but it could if it's easier)
Srikanth Sundarrajan: the reason I am asking is
Srikanth Sundarrajan: this is a feature that is already built into falcon
Srikanth Sundarrajan: Every falcon process on post-processing has an ability to
send notification to both a falcon topic that the system listens to and a user
topic
Srikanth Sundarrajan: this is different from the standard server topic
Jean-Baptiste Onofré: if it already exists in Falcon, and the "constraint" is
that the M/R job has to be a falcon process, it's fine for me
Srikanth Sundarrajan: so essentially messages can goto two separate topic
Jean-Baptiste Onofré: but I need a process in any case right ?
Srikanth Sundarrajan: yes
Jean-Baptiste Onofré: that was my mistake, I was just on feed level
Srikanth Sundarrajan: no issues. So it looks the existing features map well to
your requirement
Srikanth Sundarrajan: except that the data generating job has to be mapped
inside a process
Srikanth Sundarrajan: then it should work well
Srikanth Sundarrajan: I just realised that these info is not in the docs
Srikanth Sundarrajan: it might be useful to add them
Jean-Baptiste Onofré: yes, I will prepare some HowTo with detail like this
Jean-Baptiste Onofré: it could be interesting for the users
Srikanth Sundarrajan: great. that will be super helpful
Srikanth Sundarrajan: Do you mind if I shared this transcript on dev mailing
list or you could share as well if that is ok
Jean-Baptiste Onofré: not at all !
Jean-Baptiste Onofré: I will prepare the CDC use case adding a job as a falcon
process
Jean-Baptiste Onofré: is there any way to tune the late-cut off workflow
interval ?
Jean-Baptiste Onofré: I saw in Oozie that the interval is very large
Jean-Baptiste Onofré: for instance, I would like to check the feed every 10mn
Srikanth Sundarrajan: that is customisable in the process definition
Jean-Baptiste Onofré: it's in the late-cut off value itself, right ?
Srikanth Sundarrajan: no
Srikanth Sundarrajan: <xs:complexType name="late-process">
<xs:sequence>
<xs:element type="late-input" name="late-input" maxOccurs="unbounded"
minOccurs="1">
<xs:annotation>
<xs:documentation>
For each input, defines the workflow that should be run
when late data is detected
</xs:documentation>
</xs:annotation>
</xs:element>
</xs:sequence>
<xs:attribute type="policy-type" name="policy" use="required"/>
<xs:attribute type="frequency-type" name="delay" use="required"/>
</xs:complexType>
Srikanth Sundarrajan: here the policy-type can be periodic etc
Srikanth Sundarrajan: and frequency-type can be minutes(10)
Jean-Baptiste Onofré: ok
Jean-Baptiste Onofré: awesome
Jean-Baptiste Onofré: it makes lot of sense
Jean-Baptiste Onofré: thanks a lot, I move this way (I will work on it
tomorrow, today, I would like to submit the patch about user ACL check at
entity submition time)
Srikanth Sundarrajan: super;
Srikanth Sundarrajan: I will take section of this doc and attach to FALCON-267
where the CDC feature was discussed, instead of being a loose hanging thread.
What do you think ?
Jean-Baptiste Onofré: +1
Jean-Baptiste Onofré: I will document about the CDC use case
Jean-Baptiste Onofré: and add documentation about the workflow/use cases that
we discussed
Srikanth Sundarrajan: please do. thanks Jean.
{code}
> Add CDC feature
> ---------------
>
> Key: FALCON-267
> URL: https://issues.apache.org/jira/browse/FALCON-267
> Project: Falcon
> Issue Type: New Feature
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
>
> I propose to add a Change Data Capture feature in Falcon.
> The idea is to be able to catch the change, firstly on HDFS files, publish
> the identified gap to a messaging topic.
> It's what I would like to PoC:
> - in a feed definition, we had a <capture/> element defining the change check
> interval.
> - we create a coordinator in oozie which execute the following workflow at
> capture interval
> - in the Falcon staging "capture" location on HDFS, we keep the first state
> of the feed. We compare (diff) the current content with the staging location,
> and write the diff in the Falcon staging. If the file is a binary, we can
> detect a change (using MD5 for instance) and the diff is the complete file
> (like in svn, git, etc).
> - if we have a diff, we publish a message in the Falcon "capture" topic
> (containing a set of JMS properties and the message body contains the link to
> the diff (on HDFS, in the Falcon staging). The "stream" copy is ovewritten by
> the new one.
> The purpose of this CDC is:
> 1/ thanks to the publication on the topic, to be able to use "external" tools
> to "react" when a change occurs. For instance, I plan to make a demo with an
> Apache Camel route (sending e-mails for example) when data change.
> 2/ staying in falcon/oozie/hadoop, to be able to setup a pipeline triggered
> by data change: for instance, trigger a job when the data change.
> The first PoC is HDFS/fs centric but I think we can do diff on HBase or Hive.
> Thoughts ?
--
This message was sent by Atlassian JIRA
(v6.2#6252)