[
https://issues.apache.org/jira/browse/FALCON-1240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14631740#comment-14631740
]
Venkat Ramachandran commented on FALCON-1240:
---------------------------------------------
Late Data Handling:
Adding the email discussions between Srikanth, Ajay, Venkat and Venky:
Srikanth,
I’m making those changes to XSD before uploading to the JIRA.
Late or out of bound data:
One thing we discussed about partitioned layout is that Falcon/Oozie
coordinator will supply the time interval and the underlying tool (Sqoop or
Kafka/Camus) will use the start and end timestamps to pull the data.
This way, the tool does not need to do repartitioning or spraying by itself (or
no need for a post processing job)
But with this approach, if late data gets inserted to the database table with
older time stamp (fact table -> timestamp column), the sqoop job will not pull
it since the highwater mark is already ahead.
k1, ts-hour1, <…>
k2, ts-hour1, <…>
k3, ts-hour1, <…>
Coordinator nominal (data date) is set to ts-hour1 and so ingestion pulls
everything for hour ts-hour1
Next materialization, coordinator nominal time is set to ts-hour2 and so
ingestion is supposed to pull everything for hour ts-hour2
Now, if someone inserts the following event, this will permanently be lost on
the Hadoop end since we will never pull it as the high water mark has
progressed.
k3, ts-hour1, <…>
* Unless some one realizes this as re-processing and resets the Falcon start
dates in the Feed etc.
Any thoughts on this one?
-----------------------------------------------
This is not a trivial problem and to pull new data which arrives late in sqoop
isn't enough, as there may be downstream jobs that are in the process of
consuming them are have already consumed them. One assumption largely made is
to assume facts are immutable. Falcon has this concept called late data
handling. If it is wired up correctly, it will solve for this problem. The way
we ought to do this is to figure a cheap way to figure older time bucket data
arrived out of band, which can be used by falcon server to determine if a
completed action has to be retriggered. You can essentially reuse the framework
that exists for this. Makes sense ? Are there gaps with the approach ?
Regards
Srikanth Sundarrajan
----------------------------------
Let’s look at the two specific cases – RDBMS and Kafka
RDBMS – Records with older timestamp may be inserted to the fact table. Unless
there is a change-capture column (say modified or updated TS column; note that
this is different from the data/timestamp column that will determine the
date/timeseries partition), it is impossible to extract the newly inserted
older timestamp records. May be, we should mandate that there needs to be a
change-capture column (either some sort of transaction id that is monotonically
increasing or a modified/updated timestamp column) for any type (dimension or
fact) of table that we need to Sqoop via Falcon — this solves the issue of
identifying/extracting new older data.
Kafka – There is a implied change capture column called offset in Kakfa. If you
are publishing events with older timestamp (each event needs to have a
timestamp), they will have a increasing offset. So, you ask for events that are
greater than the previous offset you recorded. This solves extracting newly
published events that may have older timestamp.
The second common problem is that once you get a set of events or rows (rdbms),
we will use the timestamp field to spray it into proper partitions on HCAT —
if I’m correct, Falcon currently provides static keys — meaning I pre-determine
what partition to write into (datestamp=20150715) without looking into the
timestamp field in the records.
This is something we need to figure out how to make Sqoop or Kakfa to use
dynamic partition of HCAT by mapping a fields from the record to partition via
Falcon definition/infrastructure.
I’m a bit of verbose to make it clean. Let me know if we are on the same page.
> Data Import and Export
> -----------------------
>
> Key: FALCON-1240
> URL: https://issues.apache.org/jira/browse/FALCON-1240
> Project: Falcon
> Issue Type: New Feature
> Components: acquisition
> Reporter: Venkat Ramachandran
> Assignee: Venkat Ramachandran
> Attachments: Falcon Data Ingestion - Proposal.docx
>
>
> JIRA to track Data Import and Export design and implementation discussions
> Attaching proposal to start with.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)