Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813012
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider 
with RelationProvider
      * by setting the output committer class in the conf of 
spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new 
CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given 
`files`.  When the format
    - * does not support inference, or no valid files are given should return 
None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    --- End diff --
    
    Here is a bit of background  - schema validation is not mandatory but it 
provides early validation if input data schema doesn't match with target table 
schema. With spark's structured streaming, not all input sources provide 
schema. Only File sources will have schema attached to them. With file sources 
user can either provide their schema readStream.schema() or file source will 
have to infer the schema internally if spark.sql.streaming.inferSchema is set 
to true. However, for socket streaming source there is no real schema attached 
to it, everything comes as a byte stream and the schema for socket source is 
always of size 1 and of type StringType. We need to bypass schema validation 
for socket source. The schema validation happens on executor side and we don't 
have any information about input source( whether its file source or socket 
source). Executor only gets schema and data , hence to avoid schema validation 
for socket source we need this check. This may not be clean approach,
  but I could not find any better way to handle this. If you have any info, 
please let me know.


---

Reply via email to