Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215972515
--- Diff:
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
---
@@ -52,19 +52,24 @@ object StreamJobManager {
}
}
- private def validateSinkTable(querySchema: StructType, sink:
CarbonTable): Unit = {
+ private def validateSinkTable(validateQuerySchema: Boolean,
+ querySchema: StructType, sink:
CarbonTable): Unit = {
if (!sink.isStreamingSink) {
throw new MalformedCarbonCommandException(s"Table
${sink.getTableName} is not " +
"streaming sink table " +
"('streaming' tblproperty
is not 'sink' or 'true')")
}
- val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map
{ column =>
- StructField(column.getColName,
-
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
- }
- if (!querySchema.equals(StructType(fields))) {
- throw new MalformedCarbonCommandException(s"Schema of table
${sink.getTableName} " +
- s"does not match query
output")
+ // TODO: validate query schema against sink in kafka (we cannot get
schema directly)
+ if (validateQuerySchema) {
+ val fields =
sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
+ StructField(
+ column.getColName,
+
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
+ }
+ if (!querySchema.equals(StructType(fields))) {
+ throw new MalformedCarbonCommandException(s"Schema of table ${
sink.getTableName } " +
--- End diff --
you can move the first half to the next line so that we can avoid string
concatenation here.
The same with line#58
---