Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141343541
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
@@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider
with RelationProvider
* be put here. For example, user defined output committer can be
configured here
* by setting the output committer class in the conf of
spark.sql.sources.outputCommitterClass.
*/
- def prepareWrite(
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ // Check if table with given path exists
+ validateTable(options.get("path").get)
+
+ /* Check id streaming data schema matches with carbon table schema
+ * Data from socket source does not have schema attached to it,
+ * Following check is to ignore schema validation for socket source.
+ */
+ if (!(dataSchema.size.equals(1) &&
+ dataSchema.fields(0).dataType.equals(StringType))) {
+ val path = options.get("path")
+ val tablePath: String = path match {
+ case Some(value) => value
+ case None => ""
+ }
+
+ val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+ getTableSchema(sparkSession: SparkSession, tablePath: String)
+ val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
+
+ if(!isSchemaValid) {
+ LOGGER.error("Schema Validation Failed: streaming data schema"
+ + "does not match with carbon table schema")
+ throw new InvalidSchemaException("Schema Validation Failed : " +
+ "streaming data schema does not match with carbon table schema")
+ }
+ }
+ new CarbonStreamingOutputWriterFactory()
+ }
+
+ /**
+ * Read schema from existing carbon table
+ * @param sparkSession
+ * @param tablePath carbon table path
+ * @return true if schema validation is successful else false
+ */
+ private def getTableSchema(
sparkSession: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new
CarbonStreamingOutputWriterFactory()
+ tablePath: String): org.apache.carbondata.format.TableSchema = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " +
tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
--- End diff --
remove unnecessary bracket and `toString`
---