This (updates) is something we are going to think about in the next release
or two.

On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <cristian.b.op...@googlemail.com
> wrote:

> Sorry, apparently only replied to Reynold, meant to copy the list as well,
> so I'm self replying and taking the opportunity to illustrate with an
> example.
>
> Basically I want to conceptually do this:
>
> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 
> 1)).toDF("k", "v")
> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => 
> (i, 1)).toDF("k", "v")
>
> bigDf.cache()
>
> bigDf.registerTempTable("big")
> deltaDf.registerTempTable("delta")
>
> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, 
> delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>
> newBigDf.cache()
> bigDf.unpersist()
>
>
> This is essentially an update of keys "1" and "50000" only, in a dataset
> of 1 million keys.
>
> This can be achieved efficiently if the join would preserve the cached
> blocks that have been unaffected, and only copy and mutate the 2 affected
> blocks corresponding to the matching join keys.
>
> Statistics can determine which blocks actually need mutating. Note also
> that shuffling is not required assuming both dataframes are pre-partitioned
> by the same key K.
>
> In SQL this could actually be expressed as an UPDATE statement or for a
> more generalized use as a MERGE UPDATE:
> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>
> While this may seem like a very special case optimization, it would
> effectively implement UPDATE support for cached DataFrames, for both
> optimal and non-optimal usage.
>
> I appreciate there's quite a lot here, so thank you for taking the time to
> consider it.
>
> Cristian
>
>
>
> On 12 November 2015 at 15:49, Cristian O <cristian.b.op...@googlemail.com>
> wrote:
>
>> Hi Reynold,
>>
>> Thanks for your reply.
>>
>> Parquet may very well be used as the underlying implementation, but this
>> is more than about a particular storage representation.
>>
>> There are a few things here that are inter-related and open different
>> possibilities, so it's hard to structure, but I'll give it a try:
>>
>> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
>> just using that as a checkpoint would currently require explicitly reading
>> it back. A proper checkpoint implementation would just save (perhaps
>> asynchronously) and prune the logical plan while allowing to continue using
>> the same DF, now backed by the checkpoint.
>>
>> It's important to prune the logical plan to avoid all kinds of issues
>> that may arise from unbounded expansion with iterative use-cases, like this
>> one I encountered recently:
>> https://issues.apache.org/jira/browse/SPARK-11596
>>
>> But really what I'm after here is:
>>
>> 2. Efficient updating of cached DataFrames - The main use case here is
>> keeping a relatively large dataset cached and updating it iteratively from
>> streaming. For example one would like to perform ad-hoc queries on an
>> incrementally updated, cached DataFrame. I expect this is already becoming
>> an increasingly common use case. Note that the dataset may require merging
>> (like adding) or overrriding values by key, so simply appending is not
>> sufficient.
>>
>> This is very similar in concept with updateStateByKey for regular RDDs,
>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>> level  (the row blocks for the columnar representation).
>>
>> This can be currently simulated with UNION or (OUTER) JOINs however is
>> very inefficient as it requires copying and recaching the entire dataset,
>> and unpersisting the original one. There are also the aforementioned
>> problems with unbounded logical plans (physical plans are fine)
>>
>> These two together, checkpointing and updating cached DataFrames, would
>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>> apps can take advantage of the compact columnar representation and Tungsten
>> optimisations.
>>
>> I'm not quite sure if something like this can be achieved by other means
>> or has been investigated before, hence why I'm looking for feedback here.
>>
>> While one could use external data stores, they would have the added IO
>> penalty, plus most of what's available at the moment is either HDFS
>> (extremely inefficient for updates) or key-value stores that have 5-10x
>> space overhead over columnar formats.
>>
>> Thanks,
>> Cristian
>>
>>
>>
>>
>>
>>
>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote:
>>
>>> Thanks for the email. Can you explain what the difference is between
>>> this and existing formats such as Parquet/ORC?
>>>
>>>
>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I was wondering if there's any planned support for local disk columnar
>>>> storage.
>>>>
>>>> This could be an extension of the in-memory columnar store, or possibly
>>>> something similar to the recently added local checkpointing for RDDs
>>>>
>>>> This could also have the added benefit of enabling iterative usage for
>>>> DataFrames by pruning the query plan through local checkpoints.
>>>>
>>>> A further enhancement would be to add update support to the columnar
>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>> references to unchanged row blocks and only copying and mutating the ones
>>>> that have changed.
>>>>
>>>> A use case here is streaming and merging updates in a large dataset
>>>> that can be efficiently stored internally in a columnar format, rather than
>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>
>>>
>>
>

Reply via email to