Hi Hyukjin, can you open a PR to revert it from 2.4? Now I'm kind of
convinced this is too breaking and we need more discussion.

+ Ryan Blue
Hi Ryan,
I think we need to look back at the new write API design and consider data
sources that don't have table concept. We should opt-in for the schema
validation of append operator.

On Thu, Oct 11, 2018 at 8:12 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:

> That's why I initially suggested to revert this part out of Spark 2.4 and
> have more discussion at 3.0 since one of the design goal of Data source V2
> is no behaviour changes to end users.
>
> 2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf <assaf.mendel...@rsa.com>님이
> 작성:
>
>> Actually, it is not just a question of a write only data source. The
>> issue is that in my case (and I imagine this is true for others), the
>> schema is not read from the database but is understood from the options.
>> This means that I have no way of understanding the schema without supplying
>> the read options. On the other hand, when writing, I have the schema from
>> the dataframe.
>>
>>
>>
>> I know the data source V2 API is considered experimental API and I have
>> no problem with it, however, this means that the change will require a
>> change in how the end user works with it (they suddenly need to add schema
>> information which they did not before), not to mention this being a
>> regression.
>>
>>
>>
>> As to the pull request, this only handles cases where the save mode is
>> not append, for the original example (having non existent path but have
>> append will still fail and according to the documentation of Append, if the
>> path does not exist it should create it).
>>
>>
>>
>> I am currently having problem compiling everything so I can’t test it
>> myself but wouldn’t changing the relation definition in “save”:
>>
>>
>>
>> val relation = DataSourceV2Relation.create(source, options, None,
>> Option(df.schema))
>>
>>
>>
>> and changing create to look like this:
>>
>>
>>
>> def create(source: DataSourceV2, options: Map[String, String],
>> tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema:
>> Option[StructType] = None): DataSourceV2Relation = {
>>
>>     val schema =
>> userSpecifiedSchema.getOrElse(source.createReader(options,
>> userSpecifiedSchema).readSchema())
>>
>>     val ident = tableIdent.orElse(tableFromOptions(options))
>>
>>     DataSourceV2Relation(
>>
>>       source, schema.toAttributes, options, ident, userSpecifiedSchema)
>>
>>   }
>>
>>
>>
>> Correct this?
>>
>>
>>
>> Or even creating a new create which simply gets the schema as non
>> optional?
>>
>>
>>
>> Thanks,
>>
>>         Assaf
>>
>>
>>
>> *From:* Hyukjin Kwon [mailto:gurwls...@gmail.com]
>> *Sent:* Thursday, October 11, 2018 10:24 AM
>> *To:* Mendelson, Assaf; Wenchen Fan
>> *Cc:* dev
>> *Subject:* Re: Possible bug in DatasourceV2
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> See https://github.com/apache/spark/pull/22688
>>
>>
>>
>> +WEnchen, here looks the problem raised. This might have to be considered
>> as a blocker ...
>>
>> On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <assaf.mendel...@rsa.com>
>> wrote:
>>
>> Hi,
>>
>> I created a datasource writer WITHOUT a reader. When I do, I get an
>> exception: org.apache.spark.sql.AnalysisException: Data source is not
>> readable: DefaultSource
>>
>> The reason for this is that when save is called, inside the source match
>> to
>> WriterSupport we have the following code:
>>
>> val source = cls.newInstance().asInstanceOf[DataSourceV2]
>>       source match {
>>         case ws: WriteSupport =>
>>           val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
>>             source,
>>             df.sparkSession.sessionState.conf)
>>           val options = sessionOptions ++ extraOptions
>> -->      val relation = DataSourceV2Relation.create(source, options)
>>
>>           if (mode == SaveMode.Append) {
>>             runCommand(df.sparkSession, "save") {
>>               AppendData.byName(relation, df.logicalPlan)
>>             }
>>
>>           } else {
>>             val writer = ws.createWriter(
>>               UUID.randomUUID.toString,
>> df.logicalPlan.output.toStructType,
>> mode,
>>               new DataSourceOptions(options.asJava))
>>
>>             if (writer.isPresent) {
>>               runCommand(df.sparkSession, "save") {
>>                 WriteToDataSourceV2(writer.get, df.logicalPlan)
>>               }
>>             }
>>           }
>>
>> but DataSourceV2Relation.create actively creates a reader
>> (source.createReader) to extract the schema:
>>
>> def create(
>>       source: DataSourceV2,
>>       options: Map[String, String],
>>       tableIdent: Option[TableIdentifier] = None,
>>       userSpecifiedSchema: Option[StructType] = None):
>> DataSourceV2Relation
>> = {
>>     val reader = source.createReader(options, userSpecifiedSchema)
>>     val ident = tableIdent.orElse(tableFromOptions(options))
>>     DataSourceV2Relation(
>>       source, reader.readSchema().toAttributes, options, ident,
>> userSpecifiedSchema)
>>   }
>>
>>
>> This makes me a little confused.
>>
>> First, the schema is defined by the dataframe itself, not by the data
>> source, i.e. it should be extracted from df.schema and not by
>> source.createReader
>>
>> Second, I see that relation is actually only use if the mode is
>> SaveMode.append (btw this means if it is needed it should be defined
>> inside
>> the "if"). I am not sure I understand the portion of the AppendData but
>> why
>> would reading from the source be included?
>>
>> Am I missing something here?
>>
>> Thanks,
>>    Assaf
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

Reply via email to