FiloDB is also closely reated. https://github.com/tuplejump/FiloDB
On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop > input/output format support: > https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java > > On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com> wrote: > >> This (updates) is something we are going to think about in the next >> release or two. >> >> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < >> cristian.b.op...@googlemail.com> wrote: >> >>> Sorry, apparently only replied to Reynold, meant to copy the list as >>> well, so I'm self replying and taking the opportunity to illustrate with an >>> example. >>> >>> Basically I want to conceptually do this: >>> >>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => >>> (i, 1)).toDF("k", "v") >>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => >>> (i, 1)).toDF("k", "v") >>> >>> bigDf.cache() >>> >>> bigDf.registerTempTable("big") >>> deltaDf.registerTempTable("delta") >>> >>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, >>> delta.v) FROM big LEFT JOIN delta on big.k = delta.k") >>> >>> newBigDf.cache() >>> bigDf.unpersist() >>> >>> >>> This is essentially an update of keys "1" and "50000" only, in a dataset >>> of 1 million keys. >>> >>> This can be achieved efficiently if the join would preserve the cached >>> blocks that have been unaffected, and only copy and mutate the 2 affected >>> blocks corresponding to the matching join keys. >>> >>> Statistics can determine which blocks actually need mutating. Note also >>> that shuffling is not required assuming both dataframes are pre-partitioned >>> by the same key K. >>> >>> In SQL this could actually be expressed as an UPDATE statement or for a >>> more generalized use as a MERGE UPDATE: >>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx >>> >>> While this may seem like a very special case optimization, it would >>> effectively implement UPDATE support for cached DataFrames, for both >>> optimal and non-optimal usage. >>> >>> I appreciate there's quite a lot here, so thank you for taking the time >>> to consider it. >>> >>> Cristian >>> >>> >>> >>> On 12 November 2015 at 15:49, Cristian O < >>> cristian.b.op...@googlemail.com> wrote: >>> >>>> Hi Reynold, >>>> >>>> Thanks for your reply. >>>> >>>> Parquet may very well be used as the underlying implementation, but >>>> this is more than about a particular storage representation. >>>> >>>> There are a few things here that are inter-related and open different >>>> possibilities, so it's hard to structure, but I'll give it a try: >>>> >>>> 1. Checkpointing DataFrames - while a DF can be saved locally as >>>> parquet, just using that as a checkpoint would currently require explicitly >>>> reading it back. A proper checkpoint implementation would just save >>>> (perhaps asynchronously) and prune the logical plan while allowing to >>>> continue using the same DF, now backed by the checkpoint. >>>> >>>> It's important to prune the logical plan to avoid all kinds of issues >>>> that may arise from unbounded expansion with iterative use-cases, like this >>>> one I encountered recently: >>>> https://issues.apache.org/jira/browse/SPARK-11596 >>>> >>>> But really what I'm after here is: >>>> >>>> 2. Efficient updating of cached DataFrames - The main use case here is >>>> keeping a relatively large dataset cached and updating it iteratively from >>>> streaming. For example one would like to perform ad-hoc queries on an >>>> incrementally updated, cached DataFrame. I expect this is already becoming >>>> an increasingly common use case. Note that the dataset may require merging >>>> (like adding) or overrriding values by key, so simply appending is not >>>> sufficient. >>>> >>>> This is very similar in concept with updateStateByKey for regular RDDs, >>>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch >>>> level (the row blocks for the columnar representation). >>>> >>>> This can be currently simulated with UNION or (OUTER) JOINs however is >>>> very inefficient as it requires copying and recaching the entire dataset, >>>> and unpersisting the original one. There are also the aforementioned >>>> problems with unbounded logical plans (physical plans are fine) >>>> >>>> These two together, checkpointing and updating cached DataFrames, would >>>> give fault-tolerant efficient updating of DataFrames, meaning streaming >>>> apps can take advantage of the compact columnar representation and Tungsten >>>> optimisations. >>>> >>>> I'm not quite sure if something like this can be achieved by other >>>> means or has been investigated before, hence why I'm looking for feedback >>>> here. >>>> >>>> While one could use external data stores, they would have the added IO >>>> penalty, plus most of what's available at the moment is either HDFS >>>> (extremely inefficient for updates) or key-value stores that have 5-10x >>>> space overhead over columnar formats. >>>> >>>> Thanks, >>>> Cristian >>>> >>>> >>>> >>>> >>>> >>>> >>>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote: >>>> >>>>> Thanks for the email. Can you explain what the difference is between >>>>> this and existing formats such as Parquet/ORC? >>>>> >>>>> >>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >>>>> cristian.b.op...@googlemail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I was wondering if there's any planned support for local disk >>>>>> columnar storage. >>>>>> >>>>>> This could be an extension of the in-memory columnar store, or >>>>>> possibly something similar to the recently added local checkpointing for >>>>>> RDDs >>>>>> >>>>>> This could also have the added benefit of enabling iterative usage >>>>>> for DataFrames by pruning the query plan through local checkpoints. >>>>>> >>>>>> A further enhancement would be to add update support to the columnar >>>>>> format (in the immutable copy-on-write sense of course), by maintaining >>>>>> references to unchanged row blocks and only copying and mutating the ones >>>>>> that have changed. >>>>>> >>>>>> A use case here is streaming and merging updates in a large dataset >>>>>> that can be efficiently stored internally in a columnar format, rather >>>>>> than >>>>>> accessing a more inefficient external data store like HDFS or Cassandra. >>>>>> >>>>>> Thanks, >>>>>> Cristian >>>>>> >>>>> >>>>> >>>> >>> >> >