I want to bring back the discussion of data source v2 abstraction.

There is a problem discovered by Hyukjin recently. For a write-only data
source, it may accept any input, and itself does not have a schema. Then
the table abstraction doesn't fit it, as table must provide a schema.

Personally I think this is a corner case. Who will develop a data source
like that? If users do have this requirement, maybe they can just implement
a table with empty schema, and in Spark the append operator skips input
schema validation if table schema is empty.

Any thoughts?

Thanks,
Wenchen

On Thu, Sep 20, 2018 at 4:51 AM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Thanks for the info Ryan – very helpful!
>
>
>
> *From: *Ryan Blue <rb...@netflix.com>
> *Reply-To: *"rb...@netflix.com" <rb...@netflix.com>
> *Date: *Wednesday, September 19, 2018 at 3:17 PM
> *To: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>
> *Cc: *Wenchen Fan <cloud0...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>,
> Spark Dev List <dev@spark.apache.org>
> *Subject: *Re: data source api v2 refactoring
>
>
>
> Hi Jayesh,
>
>
>
> The existing sources haven't been ported to v2 yet. That is going to be
> tricky because the existing sources implement behaviors that we need to
> keep for now.
>
>
>
> I wrote up an SPIP to standardize logical plans while moving to the v2
> sources. The reason why we need this is that too much is delegated to
> sources today. For example, sources are handed a SaveMode to overwrite
> data, but what exactly gets overwritten isn't defined and it varies by the
> source that gets used. That's not a good thing and we want to clean up what
> happens so that users know that a query behaves the same way across all v2
> sources. CTAS shouldn't succeed for one source but fail for another if the
> table already exists.
>
>
>
> Standardizing plans makes it difficult to port the existing sources to v2
> because we need to implement the behavior of the v2 plans, which may not be
> the existing v1 behavior. I think what we should do is keep the existing v1
> sources working as they do today, and add a way to opt in for v2 behavior.
> One good way to do this is to use a new write API that is more clear; I
> proposed one in the SPIP I mentioned earlier. SQL is a bit easier because
> the behavior for SQL is fairly well-defined. The problem is mostly with the
> existing DF write API, DataFrameWriter.
>
>
>
> It would be great to open a discussion about the compatibility between v1
> and v2 and come up with a plan on this list.
>
>
>
> rb
>
>
>
> On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Ryan et al,
>
>
>
> Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka)
> have been ported to V2.
>
> I remember reading threads where there were discussions about the
> inefficiency/overhead of converting from Row to InternalRow that was
> preventing certain porting effort etc.
>
>
>
> I ask because those are the most widely used data sources and have a lot
> of effort and thinking behind them, and if they have ported over to V2,
> then they can serve as excellent production examples of V2 API.
>
>
>
> Thanks,
>
> Jayesh
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *<rb...@netflix.com>
> *Date: *Friday, September 7, 2018 at 2:19 PM
> *To: *Wenchen Fan <cloud0...@gmail.com>
> *Cc: *Hyukjin Kwon <gurwls...@gmail.com>, Spark Dev List <
> dev@spark.apache.org>
> *Subject: *Re: data source api v2 refactoring
>
>
>
> There are a few v2-related changes that we can work in parallel, at least
> for reviews:
>
>
>
> * SPARK-25006, #21978 <https://github.com/apache/spark/pull/21978>: Add
> catalog to TableIdentifier - this proposes how to incrementally add
> multi-catalog support without breaking existing code paths
>
> * SPARK-24253, #21308 <https://github.com/apache/spark/pull/21308>: Add
> DeleteSupport API - this is a small API addition, which doesn't affect the
> refactor
>
> * SPARK-24252, #21306 <https://github.com/apache/spark/pull/21306>: Add
> v2 Catalog API - this is a different way to create v2 tables, also doesn't
> affect the refactor
>
>
>
> I agree that the PR for adding SQL support should probably wait on the
> refactor. I have also been meaning to share our implementation, which isn't
> based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and
> AlterTable from both SQL and the other methods in the DF API, saveAsTable
> and insertInto. It follows the structure that I proposed on the SQL support
> PR to convert SQL plans to v2 plans and uses the new TableCatalog to
> implement CTAS and RTAS.
>
>
>
> rb
>
>
>
>
>
> On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>
> Hi Ryan,
>
>
>
> You are right that the `LogicalWrite` mirrors the read side API. I just
> don't have a good naming yet, and write side changes will be a different PR.
>
>
>
>
>
> Hi Hyukjin,
>
>
>
> That's my expectation, otherwise we keep rebasing the refactor PR and
> never get it done.
>
>
>
> On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:
>
> BTW, do we hold Datasource V2 related PRs for now until we finish this
> refactoring just for clarification?
>
>
>
> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성:
>
> Wenchen,
>
>
>
> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
> Is it something that mirrors the read side in your PR?
>
>
>
> I think that I agree that if we have a Write independent of the Table that
> carries the commit and abort methods, then we can create it directly
> without a WriteConfig. So I tentatively agree with what you propose,
> assuming that I understand it correctly.
>
>
>
> rb
>
>
>
> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>
> I'm switching to my another Gmail account, let's see if it still gets
> dropped this time.
>
>
>
> Hi Ryan,
>
>
>
> I'm thinking about the write path and feel the abstraction should be the
> same.
>
>
>
> We still have logical and physical writing. And the table can create
> different logical writing based on how to write. e.g., append, delete,
> replaceWhere, etc.
>
>
>
> One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
> API would look like
>
> trait Table {
>
>   WriteConfig newAppendWriteConfig();
>
>
>
>   WriteConfig newDeleteWriteConfig(deleteExprs);
>
>
>
>   LogicalWrite newLogicalWrite(writeConfig);
>
> }
>
>
>
> Without WriteConfig, the API looks like
>
> trait Table {
>
>   LogicalWrite newAppendWrite();
>
>
>
>   LogicalWrite newDeleteWrite(deleteExprs);
>
> }
>
>
>
>
>
> It looks to me that the API is simpler without WriteConfig, what do you
> think?
>
>
>
> Thanks,
>
> Wenchen
>
>
>
> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
> Latest from Wenchen in case it was dropped.
>
> ---------- Forwarded message ---------
> From: *Wenchen Fan* <wenc...@databricks.com>
> Date: Mon, Sep 3, 2018 at 6:16 AM
> Subject: Re: data source api v2 refactoring
> To: <mri...@gmail.com>
> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, <
> dev@spark.apache.org>
>
>
>
> Hi Mridul,
>
>
>
> I'm not sure what's going on, my email was CC'ed to the dev list.
>
>
>
>
>
> Hi Ryan,
>
>
>
> The logical and physical scan idea sounds good. To add more color
> to Jungtaek's question, both micro-batch and continuous mode have
> the logical and physical scan, but there is a difference: for micro-batch
> mode, a physical scan outputs data for one epoch, but it's not true for
> continuous mode.
>
>
>
> I'm not sure if it's necessary to include streaming epoch in the API
> abstraction, for features like metrics reporting.
>
>
>
> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
>
>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
> did :-) )
>
> I did not see it in the mail thread I received or in archives ... [1]
> Wondering which othersenderswere getting dropped (if yes).
>
>
>
> Regards
>
> Mridul
>
>
>
> [1]
> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
>
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
> Thanks for clarifying, Wenchen. I think that's what I expected.
>
>
>
> As for the abstraction, here's the way that I think about it: there are
> two important parts of a scan: the definition of what will be read, and
> task sets that actually perform the read. In batch, there's one definition
> of the scan and one task set so it makes sense that there's one scan object
> that encapsulates both of these concepts. For streaming, we need to
> separate the two into the definition of what will be read (the stream or
> streaming read) and the task sets that are run (scans). That way, the
> streaming read behaves like a factory for scans, producing scans that
> handle the data either in micro-batches or using continuous tasks.
>
>
>
> To address Jungtaek's question, I think that this does work with
> continuous. In continuous mode, the query operators keep running and send
> data to one another directly. The API still needs a streaming read layer
> because it may still produce more than one continuous scan. That would
> happen when the underlying source changes and Spark needs to reconfigure. I
> think the example here is when partitioning in a Kafka topic changes and
> Spark needs to re-map Kafka partitions to continuous tasks.
>
>
>
> rb
>
>
>
> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com>
> wrote:
>
> Hi Ryan,
>
>
>
> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
> which is not table/stream/scan, but something between them. The table
> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
> For streaming source, stream is the one to take care of the pushdown
> result. For batch source, it's the scan.
>
>
>
> It's a little tricky because stream is an abstraction for streaming source
> only. Better ideas are welcome!
>
>
>
> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote:
>
> Thanks, Reynold!
>
>
>
> I think your API sketch looks great. I appreciate having the Table level
> in the abstraction to plug into as well. I think this makes it clear what
> everything does, particularly having the Stream level that represents a
> configured (by ScanConfig) streaming read and can act as a factory for
> individual batch scans or for continuous scans.
>
>
>
> Wenchen, I'm not sure what you mean by doing pushdown at the table level.
> It seems to mean that pushdown is specific to a batch scan or streaming
> read, which seems to be what you're saying as well. Wouldn't the pushdown
> happen to create a ScanConfig, which is then used as Reynold suggests?
> Looking forward to seeing this PR when you get it posted. Thanks for all of
> your work on this!
>
>
>
> rb
>
>
>
> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com>
> wrote:
>
> Thank Reynold for writing this and starting the discussion!
>
>
>
> Data source v2 was started with batch only, so we didn't pay much
> attention to the abstraction and just follow the v1 API. Now we are
> designing the streaming API and catalog integration, the abstraction
> becomes super important.
>
>
>
> I like this proposed abstraction and have successfully prototyped it to
> make sure it works.
>
>
>
> During prototyping, I have to work around the issue that the current
> streaming engine does query optimization/planning for each micro batch.
> With this abstraction, the operator pushdown is only applied once
> per-query. In my prototype, I do the physical planning up front to get the
> pushdown result, and
>
> add a logical linking node that wraps the resulting physical plan node for
> the data source, and then swap that logical linking node into the logical
> plan for each batch. In the future we should just let the streaming engine
> do query optimization/planning only once.
>
>
>
> About pushdown, I think we should do it at the table level. The table
> should create a new pushdow handler to apply operator pushdowm for each
> scan/stream, and create the scan/stream with the pushdown result. The
> rationale is, a table should have the same pushdown behavior regardless the
> scan node.
>
>
>
> Thanks,
>
> Wenchen
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> wrote:
>
> I spent some time last week looking at the current data source v2 apis,
> and I thought we should be a bit more buttoned up in terms of the
> abstractions and the guarantees Spark provides. In particular, I feel we
> need the following levels of "abstractions", to fit the use cases in Spark,
> from batch, to streaming.
>
>
>
> Please don't focus on the naming at this stage. When possible, I draw
> parallels to what similar levels are named in the currently committed api:
>
>
>
> 0. Format: This represents a specific format, e.g. Parquet, ORC. There is
> currently no explicit class at this level.
>
>
>
> 1. Table: This should represent a logical dataset (with schema). This
> could be just a directory on the file system, or a table in the catalog.
> Operations on tables can include batch reads (Scan), streams, writes, and
> potentially other operations such as deletes. The closest to the table
> level abstraction in the current code base is the "Provider" class,
> although Provider isn't quite a Table. This is similar to Ryan's proposed
> design.
>
>
>
> 2. Stream: Specific to streaming. A stream is created out of a Table. This
> logically represents a an instance of a StreamingQuery. Pushdowns and
> options are handled at this layer. I.e. Spark guarnatees to data source
> implementation pushdowns and options don't change within a Stream. Each
> Stream consists of a sequence of scans. There is no equivalent concept in
> the current committed code.
>
>
>
> 3. Scan: A physical scan -- either as part of a streaming query, or a
> batch query. This should contain sufficient information and methods so we
> can run a Spark job over a defined subset of the table. It's functionally
> equivalent to an RDD, except there's no dependency on RDD so it is a
> smaller surface. In the current code, the equivalent class would be the
> ScanConfig, which represents the information needed, but in order to
> execute a job, ReadSupport is needed (various methods in ReadSupport takes
> a ScanConfig).
>
>
>
>
>
> To illustrate with pseudocode what the different levels mean, a batch
> query would look like the following:
>
>
>
> val provider = reflection[Format]("parquet")
>
> val table = provider.createTable(options)
>
> val scan = table.createScan(scanConfig) // scanConfig includes pushdown
> and options
> // run tasks on executors
>
>
>
> A streaming micro-batch scan would look like the following:
>
>
>
> val provider = reflection[Format]("parquet")
>
> val table = provider.createTable(options)
>
> val stream = table.createStream(scanConfig)
>
>
>
> while(true) {
>
>   val scan = streamingScan.createScan(startOffset)
>
>   // run tasks on executors
>
> }
>
>
>
>
>
> Vs the current API, the above:
>
>
>
> 1. Creates an explicit Table abstraction, and an explicit Scan abstraction.
>
>
>
> 2. Have an explicit Stream level and makes it clear pushdowns and options
> are handled there, rather than at the individual scan (ReadSupport) level.
> Data source implementations don't need to worry about pushdowns or options
> changing mid-stream. For batch, those happen when the scan object is
> created.
>
>
>
>
>
>
>
> This email is just a high level sketch. I've asked Wenchen to prototype
> this, to see if it is actually feasible and the degree of hacks it removes,
> or creates.
>
>
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>

Reply via email to