I'm still not sure how the staging table helps for databases which do not
have such atomicity guarantees. For example in Cassandra if you wrote all
of the data temporarily to a staging table, we would still have the same
problem in moving the data from the staging table into the real table. We
would likely have as similar a chance of failing and we still have no way
of making the entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan <ar...@apache.org> wrote:

> >Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
> Yes its more about guaranteeing atomicity like all partitions eventually
> commit or none commits. The visibility of the data for the readers is
> orthogonal (e.g setting the isolation levels like serializable for XA) and
> in general its difficult to guarantee that data across partitions are
> visible at once. The approach like staging table and global commit works in
> a centralized set up but can be difficult to do in a distributed manner
> across partitions (e.g each partition output goes to a different database)
>
> On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> IMHO that's up to how we would like to be strict about "exactly-once".
>>
>> Some being said it is exactly-once when the output is eventually
>> exactly-once, whereas others being said there should be no side effect,
>> like consumer shouldn't see partial write. I guess 2PC is former, since
>> some partitions can commit earlier while other partitions fail to commit
>> for some time.
>>
>> Being said, there may be couple of alternatives other than the contract
>> Spark provides/requires, and I'd like to see how Spark community wants to
>> deal with others. Would we want to disallow alternatives, like "replay +
>> deduplicate write (per a batch/partition)" which ensures "eventually"
>> exactly-once but cannot ensure the contract?
>>
>> Btw, unless achieving exactly-once is light enough for given sink, I
>> think the sink should provide both at-least-once (also optimized for the
>> semantic) vs exactly-once, and let end users pick one.
>>
>> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer <russell.spit...@gmail.com>님이
>> 작성:
>>
>>> Why is atomic operations a requirement? I feel like doubling the amount
>>> of writes (with staging tables) is probably a tradeoff that the end user
>>> should make.
>>>
>>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>>>
>>>> Regardless the API, to use Spark to write data atomically, it requires
>>>> 1. Write data distributedly, with a central coordinator at Spark driver.
>>>> 2. The distributed writers are not guaranteed to run together at the
>>>> same time. (This can be relaxed if we can extend the barrier scheduling
>>>> feature)
>>>> 3. The new data is visible if and only if all distributed writers
>>>> success.
>>>>
>>>> According to these requirements, I think using a staging table is the
>>>> most common way and maybe the only way. I'm not sure how 2PC can help, we
>>>> don't want users to read partial data, so we need a final step to commit
>>>> all the data together.
>>>>
>>>> For RDBMS data sources, I think a simple solution is to ask users to
>>>> coalesce the input RDD/DataFrame into one partition, then we don't need to
>>>> care about multi-client transaction. Or using a staging table like Ryan
>>>> described before.
>>>>
>>>>
>>>>
>>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>
>>>>> > And regarding the issue that Jungtaek brought up, 2PC doesn't
>>>>> require tasks to be running at the same time, we need a mechanism to take
>>>>> down tasks after they have prepared and bring up the tasks during the
>>>>> commit phase.
>>>>>
>>>>> I guess we already got into too much details here, but if it is based
>>>>> on client transaction Spark must assign "commit" tasks to the executor
>>>>> which task was finished "prepare", and if it loses executor it is not
>>>>> feasible to force committing. Staging should come into play for that.
>>>>>
>>>>> We should also have mechanism for "recovery": Spark needs to ensure it
>>>>> finalizes "commit" even in case of failures before starting a new batch.
>>>>>
>>>>> So not an easy thing to integrate correctly.
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan <ar...@apache.org>님이 작성:
>>>>>
>>>>>> >Well almost all relational databases you can move data in a
>>>>>> transactional way. That’s what transactions are for.
>>>>>>
>>>>>> It would work, but I suspect in most cases it would involve moving
>>>>>> data from temporary tables to the final tables
>>>>>>
>>>>>> Right now theres no mechanisms to let the individual tasks commit in
>>>>>> a two-phase manner (Not sure if the CommitCordinator might help). If such
>>>>>> an API is provided, the sources could use it as they wish (e.g. use XA
>>>>>> support provided by mysql to implement it in a more efficient way than 
>>>>>> the
>>>>>> driver moving from temp tables to destination tables).
>>>>>>
>>>>>> Definitely there are complexities involved, but I am not sure if the
>>>>>> network partitioning comes into play here since the driver can act as the
>>>>>> co-ordinator and can run in HA mode. And regarding the issue that 
>>>>>> Jungtaek
>>>>>> brought up, 2PC doesn't require tasks to be running at the same time, we
>>>>>> need a mechanism to take down tasks after they have prepared and bring up
>>>>>> the tasks during the commit phase.
>>>>>>
>>>>>> Most of the sources would not need any of the above and just need a
>>>>>> way to support Idempotent writes and like Ryan suggested we can enable 
>>>>>> this
>>>>>> (if there are gaps in the current APIs).
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <r...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Well almost all relational databases you can move data in a
>>>>>>> transactional way. That’s what transactions are for.
>>>>>>>
>>>>>>> For just straight HDFS, the move is a pretty fast operation so while
>>>>>>> it is not completely transactional, the window of potential failure is
>>>>>>> pretty short for appends. For writers at the partition level it is fine
>>>>>>> because it is just renaming directory, which is atomic.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> When network partitioning happens it is pretty OK for me to see 2PC
>>>>>>>> not working, cause we deal with global transaction. Recovery should be 
>>>>>>>> hard
>>>>>>>> thing to get it correctly though. I completely agree it would require
>>>>>>>> massive changes to Spark.
>>>>>>>>
>>>>>>>> What I couldn't find for underlying storages is moving data from
>>>>>>>> staging table to final table in transactional way. I'm not fully sure 
>>>>>>>> but
>>>>>>>> as I'm aware of, many storages would not support moving data, and even 
>>>>>>>> HDFS
>>>>>>>> sink it is not strictly done in transactional way since we move 
>>>>>>>> multiple
>>>>>>>> files with multiple operations. If coordinator just crashes it leaves
>>>>>>>> partial write, and among writers and coordinator need to deal with 
>>>>>>>> ensuring
>>>>>>>> it will not be going to be duplicated.
>>>>>>>>
>>>>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us
>>>>>>>> to implement "commit" (his reply didn't hit dev. mailing list though) 
>>>>>>>> but
>>>>>>>> I'm not an expert of both twos and I couldn't still imagine it can deal
>>>>>>>> with various crash cases.
>>>>>>>>
>>>>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <r...@databricks.com>님이 작성:
>>>>>>>>
>>>>>>>>> I don't think two phase commit would work here at all.
>>>>>>>>>
>>>>>>>>> 1. It'd require massive changes to Spark.
>>>>>>>>>
>>>>>>>>> 2. Unless the underlying data source can provide an API to
>>>>>>>>> coordinate commits (which few data sources I know provide something 
>>>>>>>>> like
>>>>>>>>> that), 2PC wouldn't work in the presence of network partitioning. You 
>>>>>>>>> can't
>>>>>>>>> defy the law of physics.
>>>>>>>>>
>>>>>>>>> Really the most common and simple way I've seen this working is
>>>>>>>>> through staging tables and a final transaction to move data from 
>>>>>>>>> staging
>>>>>>>>> table to final table.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I guess we all are aware of limitation of contract on DSv2
>>>>>>>>>> writer. Actually it can be achieved only with HDFS sink (or other
>>>>>>>>>> filesystem based sinks) and other external storage are normally not
>>>>>>>>>> feasible to implement it because there's no way to couple a 
>>>>>>>>>> transaction
>>>>>>>>>> with multiple clients as well as coordinator can't take over 
>>>>>>>>>> transactions
>>>>>>>>>> from writers to do the final commit.
>>>>>>>>>>
>>>>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>>>>> execution model: Spark doesn't require writer tasks to run at the 
>>>>>>>>>> same time
>>>>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>>>>> client
>>>>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>>>>> should
>>>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>>>>> completeness of batch. And it might require different integration for
>>>>>>>>>> continuous mode.
>>>>>>>>>>
>>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan <ar...@apache.org>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>>>>> consistency (and does not care if the output is written out 
>>>>>>>>>>> atomically)
>>>>>>>>>>>
>>>>>>>>>>> XA can be one option for datasources that supports it and
>>>>>>>>>>> requires atomicity but I am not sure how would one implement it 
>>>>>>>>>>> with the
>>>>>>>>>>> current API.
>>>>>>>>>>>
>>>>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once 
>>>>>>>>>>> the driver
>>>>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked 
>>>>>>>>>>> at each
>>>>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>>>>> "commit" is with the driver and it may not always be possible for 
>>>>>>>>>>> the
>>>>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbis...@us.ibm.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>>> for the vast majority of data stores, the boundary of a 
>>>>>>>>>>>> transaction is per
>>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>>> coordinating a
>>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>>> relational
>>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>>> (consider
>>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>>
>>>>>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA)
>>>>>>>>>>>> for this ? Not sure how easy it is to implement this though :-)
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Dilip Biswal
>>>>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>>>>>>> dbis...@us.ibm.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ----- Original message -----
>>>>>>>>>>>> From: Reynold Xin <r...@databricks.com>
>>>>>>>>>>>> To: Ryan Blue <rb...@netflix.com>
>>>>>>>>>>>> Cc: ross.law...@gmail.com, dev <dev@spark.apache.org>
>>>>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think the problem is just whether we have a starting
>>>>>>>>>>>> point for write. As a matter of fact there's always a starting 
>>>>>>>>>>>> point for
>>>>>>>>>>>> write, whether it is explicit or implicit.
>>>>>>>>>>>>
>>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>>> for the vast majority of data stores, the boundary of a 
>>>>>>>>>>>> transaction is per
>>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>>> coordinating a
>>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>>> relational
>>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>>> (consider
>>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rb...@netflix.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Ross, I think the intent is to create a single transaction on
>>>>>>>>>>>> the driver, write as part of it in each task, and then commit the
>>>>>>>>>>>> transaction once the tasks complete. Is that possible in your
>>>>>>>>>>>> implementation?
>>>>>>>>>>>>
>>>>>>>>>>>> I think that part of this is made more difficult by not having
>>>>>>>>>>>> a clear starting point for a write, which we are fixing in the 
>>>>>>>>>>>> redesign of
>>>>>>>>>>>> the v2 API. That will have a method that creates a Write to track 
>>>>>>>>>>>> the
>>>>>>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>>>>>>> commit
>>>>>>>>>>>> the transaction when commit is called on it.
>>>>>>>>>>>>
>>>>>>>>>>>> rb
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin <
>>>>>>>>>>>> r...@databricks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <
>>>>>>>>>>>> ross.law...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I've been prototyping an implementation of the DataSource V2
>>>>>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of 
>>>>>>>>>>>> questions
>>>>>>>>>>>> about how its intended to be used with database systems. According 
>>>>>>>>>>>> to the
>>>>>>>>>>>> Javadoc for DataWriter.commit():
>>>>>>>>>>>>
>>>>>>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>>>>>>> WriterCommitMessage"*
>>>>>>>>>>>>
>>>>>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way
>>>>>>>>>>>> to "hide" the data once it has been written. So as soon as the 
>>>>>>>>>>>> DataWriter
>>>>>>>>>>>> has committed the data, it has been inserted/updated in the 
>>>>>>>>>>>> collection and
>>>>>>>>>>>> is discoverable - thereby breaking the documented contract.
>>>>>>>>>>>>
>>>>>>>>>>>> I was wondering how other databases systems plan to implement
>>>>>>>>>>>> this API and meet the contract as per the Javadoc?
>>>>>>>>>>>>
>>>>>>>>>>>> Many thanks
>>>>>>>>>>>>
>>>>>>>>>>>> Ross
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>> Netflix
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to