Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138558394
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
@@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider
with RelationProvider
* by setting the output committer class in the conf of
spark.sql.sources.outputCommitterClass.
*/
def prepareWrite(
- sparkSession: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new
CarbonStreamingOutputWriterFactory()
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
-/**
- * When possible, this method should return the schema of the given
`files`. When the format
- * does not support inference, or no valid files are given should return
None. In these cases
- * Spark will require that user specify the schema manually.
- */
+ // Check if table with given path exists
+ validateTable(options.get("path").get)
+
+ // Check id streaming data schema matches with carbon table schema
+ // Data from socket source does not have schema attached to it,
+ // Following check is to ignore schema validation for socket source.
+ if (!(dataSchema.size.equals(1) &&
+ dataSchema.fields(0).dataType.equals(StringType))) {
+ val tablePath = options.get("path")
+ val path: String = tablePath match {
+ case Some(value) => value
+ case None => ""
+ }
+ val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf,
path)
+ val schemaPath = path + "/Metadata/schema"
+ val schema: TableInfo = meta.readSchemaFile(schemaPath)
+ val isSchemaValid = validateSchema(schema, dataSchema)
+
+ if(!isSchemaValid) {
+ LOGGER.error("Schema Validation Failed: streaming data schema"
+ + "does not match with carbon table schema")
+ throw new InvalidSchemaException("Schema Validation Failed : " +
+ "streaming data schema does not match with carbon table schema")
+ }
+ }
+ new CarbonStreamingOutputWriterFactory()
+ }
+
+ /**
+ * Read schema from existing carbon table
+ * @param sparkSession
+ * @param tablePath carbon table path
+ * @return true if schema validation is successful else false
+ */
+ private def getTableSchema(sparkSession: SparkSession, tablePath:
String): TableInfo = {
+ val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf,
tablePath)
+ val schemaPath = tablePath + "/Metadata/schema"
+ val schema: TableInfo = meta.readSchemaFile(schemaPath)
+ schema
+ }
+
+ /**
+ * Validates streamed schema against existing table schema
+ * @param schema existing carbon table schema
+ * @param dataSchema streamed data schema
+ * @return true if schema validation is successful else false
+ */
+ private def validateSchema(schema: TableInfo, dataSchema: StructType):
Boolean = {
+ val factTable: TableSchema = schema.getFact_table
+
+ import scala.collection.mutable.ListBuffer
+ import scala.collection.JavaConverters._
+ var columnnSchemaValues =
factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal)
+
+ var columnDataTypes = new ListBuffer[String]()
+ for(columnDataType <- columnnSchemaValues) {
+ columnDataTypes.append(columnDataType.data_type.toString)
+ }
+ val tableColumnDataTypeList = columnDataTypes.toList
+
+ var streamSchemaDataTypes = new ListBuffer[String]()
+ for(i <- 0 until dataSchema.size) {
+ streamSchemaDataTypes
+ .append(
+
mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
+ }
+ val streamedDataTypeList = streamSchemaDataTypes.toList
+
+ val isValid = tableColumnDataTypeList == streamedDataTypeList
+ isValid
+ }
+
+ /**
+ * Parses streamed datatype according to carbon datatype
+ * @param dataType
+ * @return String
+ */
+ def mapStreamingDataTypeToString(dataType: String): String = {
+ dataType match {
+ case "IntegerType" => DataType.INT.toString
+ case "StringType" => DataType.STRING.toString
+ case "DateType" => DataType.DATE.toString
+ case "DoubleType" => DataType.DOUBLE.toString
+ case "FloatType" => DataType.DOUBLE.toString
+ case "LongType" => DataType.LONG.toString
+ case "ShortType" => DataType.SHORT.toString
+ case "TimestampType" => DataType.TIMESTAMP.toString
+ }
+ }
+
+ /**
+ * Validates if given table exists or throws exception
+ * @param String existing carbon table path
+ * @return None
+ */
+ private def validateTable(tablePath: String): Unit = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " +
tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
+ .concat(tableName)).toString) - 1)
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
+ new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName,
+ UUID.randomUUID().toString))
+
+ if (!checkIfTableExists(absoluteTableIdentifier)) {
+ throw new NoSuchTableException(dbName, tableName)
+ }
+ }
+
+ /**
+ * Checks if table exists by checking its schema file
+ * @param absoluteTableIdentifier
+ * @return Boolean
+ */
+ private def checkIfTableExists(absoluteTableIdentifier:
AbsoluteTableIdentifier): Boolean = {
+ val carbonTablePath: CarbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier)
+ val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
+ }
+
+ /**
+ * If use wants to stream data from carbondata table source
+ * and if following conditions are true:
+ * 1. No schema provided by the user in readStream()
+ * 2. spark.sql.streaming.schemaInference is set to true
+ * carbondata can infer a table schema from a valid table path
+ * The schema inference is not mandatory, but good have.
+ * When possible, this method should return the schema of the given
`files`. When the format
+ * does not support inference, or no valid files are given should return
None. In these cases
+ * Spark will require that user specify the schema manually.
+ */
def inferSchema(
--- End diff --
Where is this function used?
---