FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB

On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
> input/output format support:
> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>
> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com> wrote:
>
>> This (updates) is something we are going to think about in the next
>> release or two.
>>
>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>> cristian.b.op...@googlemail.com> wrote:
>>
>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>> example.
>>>
>>> Basically I want to conceptually do this:
>>>
>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => 
>>> (i, 1)).toDF("k", "v")
>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => 
>>> (i, 1)).toDF("k", "v")
>>>
>>> bigDf.cache()
>>>
>>> bigDf.registerTempTable("big")
>>> deltaDf.registerTempTable("delta")
>>>
>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, 
>>> delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>
>>> newBigDf.cache()
>>> bigDf.unpersist()
>>>
>>>
>>> This is essentially an update of keys "1" and "50000" only, in a dataset
>>> of 1 million keys.
>>>
>>> This can be achieved efficiently if the join would preserve the cached
>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>> blocks corresponding to the matching join keys.
>>>
>>> Statistics can determine which blocks actually need mutating. Note also
>>> that shuffling is not required assuming both dataframes are pre-partitioned
>>> by the same key K.
>>>
>>> In SQL this could actually be expressed as an UPDATE statement or for a
>>> more generalized use as a MERGE UPDATE:
>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>
>>> While this may seem like a very special case optimization, it would
>>> effectively implement UPDATE support for cached DataFrames, for both
>>> optimal and non-optimal usage.
>>>
>>> I appreciate there's quite a lot here, so thank you for taking the time
>>> to consider it.
>>>
>>> Cristian
>>>
>>>
>>>
>>> On 12 November 2015 at 15:49, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
>>>> Hi Reynold,
>>>>
>>>> Thanks for your reply.
>>>>
>>>> Parquet may very well be used as the underlying implementation, but
>>>> this is more than about a particular storage representation.
>>>>
>>>> There are a few things here that are inter-related and open different
>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>
>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>> parquet, just using that as a checkpoint would currently require explicitly
>>>> reading it back. A proper checkpoint implementation would just save
>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>> continue using the same DF, now backed by the checkpoint.
>>>>
>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>> that may arise from unbounded expansion with iterative use-cases, like this
>>>> one I encountered recently:
>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>
>>>> But really what I'm after here is:
>>>>
>>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>>> keeping a relatively large dataset cached and updating it iteratively from
>>>> streaming. For example one would like to perform ad-hoc queries on an
>>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>>> an increasingly common use case. Note that the dataset may require merging
>>>> (like adding) or overrriding values by key, so simply appending is not
>>>> sufficient.
>>>>
>>>> This is very similar in concept with updateStateByKey for regular RDDs,
>>>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>>>> level  (the row blocks for the columnar representation).
>>>>
>>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>>> very inefficient as it requires copying and recaching the entire dataset,
>>>> and unpersisting the original one. There are also the aforementioned
>>>> problems with unbounded logical plans (physical plans are fine)
>>>>
>>>> These two together, checkpointing and updating cached DataFrames, would
>>>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>>>> apps can take advantage of the compact columnar representation and Tungsten
>>>> optimisations.
>>>>
>>>> I'm not quite sure if something like this can be achieved by other
>>>> means or has been investigated before, hence why I'm looking for feedback
>>>> here.
>>>>
>>>> While one could use external data stores, they would have the added IO
>>>> penalty, plus most of what's available at the moment is either HDFS
>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>> space overhead over columnar formats.
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote:
>>>>
>>>>> Thanks for the email. Can you explain what the difference is between
>>>>> this and existing formats such as Parquet/ORC?
>>>>>
>>>>>
>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>> cristian.b.op...@googlemail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I was wondering if there's any planned support for local disk
>>>>>> columnar storage.
>>>>>>
>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>> possibly something similar to the recently added local checkpointing for
>>>>>> RDDs
>>>>>>
>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>
>>>>>> A further enhancement would be to add update support to the columnar
>>>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>>>> references to unchanged row blocks and only copying and mutating the ones
>>>>>> that have changed.
>>>>>>
>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>> that can be efficiently stored internally in a columnar format, rather 
>>>>>> than
>>>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>>>
>>>>>> Thanks,
>>>>>> Cristian
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to