Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215972515
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
    @@ -52,19 +52,24 @@ object StreamJobManager {
         }
       }
     
    -  private def validateSinkTable(querySchema: StructType, sink: 
CarbonTable): Unit = {
    +  private def validateSinkTable(validateQuerySchema: Boolean,
    +                                querySchema: StructType, sink: 
CarbonTable): Unit = {
         if (!sink.isStreamingSink) {
           throw new MalformedCarbonCommandException(s"Table 
${sink.getTableName} is not " +
                                                     "streaming sink table " +
                                                     "('streaming' tblproperty 
is not 'sink' or 'true')")
         }
    -    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map 
{ column =>
    -      StructField(column.getColName,
    -        
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
    -    }
    -    if (!querySchema.equals(StructType(fields))) {
    -      throw new MalformedCarbonCommandException(s"Schema of table 
${sink.getTableName} " +
    -                                                s"does not match query 
output")
    +    // TODO: validate query schema against sink in kafka (we cannot get 
schema directly)
    +    if (validateQuerySchema) {
    +      val fields = 
sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
    +        StructField(
    +          column.getColName,
    +          
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
    +      }
    +      if (!querySchema.equals(StructType(fields))) {
    +        throw new MalformedCarbonCommandException(s"Schema of table ${ 
sink.getTableName } " +
    --- End diff --
    
    you can move the first half to the next line so that we can avoid string 
concatenation here.
    The same with line#58


---

Reply via email to