Christian Kurz created SPARK-12097:
--------------------------------------
Summary: How to do a cached, batched JDBC-lookup in Spark
Streaming?
Key: SPARK-12097
URL: https://issues.apache.org/jira/browse/SPARK-12097
Project: Spark
Issue Type: Brainstorming
Components: Streaming
Reporter: Christian Kurz
h3. Use-case
I need to enrich incoming Kafka data with data from a lookup table (or query)
on a relational database. Lookup data is changing slowly over time (So caching
is okay for a certain retention time). Lookup data is potentially huge (So
loading all data upfront is not option).
h3. Problem
The overall design idea is to implement a cached and batched JDBC lookup. That
is, for any lookup keys, which are missing from the lookup cache, a JDBC lookup
is done to retrieve the missing lookup data. JDBC lookups are rather expensive
(connection overhead, number of round-trips) and therefore must be done in
batches. E.g. one JDBC lookup per 100 missing keys.
So the high-level logic might look something like this:
# For every Kafka RDD we extract all lookup keys
# For all lookup keys we check whether the lookup data is already available
already in cache and whether this cached information has not expired, yet.
# For any lookup keys not found in cache (or expired), we send batched prepared
JDBC Statements to the database to fetch the missing lookup data:
{{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}}
to minimize the number of JDBC round-trips.
# At this point we have up-to-date lookup data for all lookup keys and can
perform the actual lookup operation.
Does this approach make sense on Spark? Would Spark State DStreams be the right
way to go? Or other design approaches?
Assuming Spark State DStreams are the right direction, the low-level question
is how to do the batching?
Would this particular signature of DStream.updateStateByKey ( iterator->
iterator):
{code:borderStyle=solid}
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
)
{code}
be the right way to batch multiple incoming keys into a single JDBC-lookup
query?
Would the new {{DStream.trackStateByKey()}} be a better approach?
The second more high-level question: is there a way to chain multiple state
operations on the same state object?
Going with the above design approach the entire lookup logic would be
handcrafted into some java/scala/python {{updateFunc}}. This function would go
over all incoming keys, check which ones are missing from cache, batch the
missing ones, run the JDBC queries and union the returned lookup data with the
existing cache from the State object.
The fact that all of this must be handcrafted into a single function seems to
be caused by the fact that Spark State processing logic on a high-level works
like this:
{code:borderStyle=solid}
input: prevStateRdd, inputRDD
output: updateStateFunc( prevStateRdd, inputRdd )}}
{code}
So only a single updateStateFunc operating on prevStateRdd and inputRdd in one
go. Once done there is no way to further refine the State as part of the
current micro batch.
The multi-step processing required here sounds like a typical use-case for a
DStream: apply multiple operations one after the other on some incoming data.
So I wonder whether there is a way to extend the concept of state processing
(may be it already has been extended?) to do something like:
{code:borderStyle=solid}
*input: prevStateRdd, inputRdd*
missingKeys = inputRdd.filter( <not exists in prevStateRdd> )
foundKeys = inputRdd.filter( <exists in prevStateRdd> )
newLookupData = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() )
newStateRdd = newLookupData.union( foundKeys).union( prevStateRdd )
*output: newStateRdd*
{code}
This would nicely leverage all the power and richness of Spark. The only
missing bit - and the reason why this approach does not work today (based on my
naive understanding of Spark - is that {{newStateRdd}} cannot be declared to be
the {{prevStateRdd}} of the next micro batch.
If Spark had a way of declaring an RDD (or DStream to be the parent for the
next batch run), even complex (chained) state operations would be easy to
describe and would not require hand-written Java/Python/Scala updateFunctions.
Thanks a lot for taking the time to read all of this!!!
Any thoughts/pointers are much appreciated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]