Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138813012
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
@@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider
with RelationProvider
* by setting the output committer class in the conf of
spark.sql.sources.outputCommitterClass.
*/
def prepareWrite(
- sparkSession: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new
CarbonStreamingOutputWriterFactory()
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
-/**
- * When possible, this method should return the schema of the given
`files`. When the format
- * does not support inference, or no valid files are given should return
None. In these cases
- * Spark will require that user specify the schema manually.
- */
+ // Check if table with given path exists
+ validateTable(options.get("path").get)
+
+ // Check id streaming data schema matches with carbon table schema
+ // Data from socket source does not have schema attached to it,
+ // Following check is to ignore schema validation for socket source.
+ if (!(dataSchema.size.equals(1) &&
+ dataSchema.fields(0).dataType.equals(StringType))) {
--- End diff --
Here is a bit of background - schema validation is not mandatory but it
provides early validation if input data schema doesn't match with target table
schema. With spark's structured streaming, not all input sources provide
schema. Only File sources will have schema attached to them. With file sources
user can either provide their schema readStream.schema() or file source will
have to infer the schema internally if spark.sql.streaming.inferSchema is set
to true. However, for socket streaming source there is no real schema attached
to it, everything comes as a byte stream and the schema for socket source is
always of size 1 and of type StringType. We need to bypass schema validation
for socket source. The schema validation happens on executor side and we don't
have any information about input source( whether its file source or socket
source). Executor only gets schema and data , hence to avoid schema validation
for socket source we need this check. This may not be clean approach,
but I could not find any better way to handle this. If you have any info,
please let me know.
---