[ https://issues.apache.org/jira/browse/KYLIN-5200 ]
Liu Zhao deleted comment on KYLIN-5200:
---------------------------------
was (Author: JIRAUSER288373):
[~xxyu],hi, Apache Kylin have a technical exchange group? Can you get me in?
Thanks
wechat: LZ_AHHF
> Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
> ---------------------------------------------------------------------------
>
> Key: KYLIN-5200
> URL: https://issues.apache.org/jira/browse/KYLIN-5200
> Project: Kylin
> Issue Type: Bug
> Components: Metadata
> Affects Versions: v4.0.1
> Reporter: Liu Zhao
> Assignee: Liu Zhao
> Priority: Major
>
> I created a cube on kylin version 4.0.1. One of the measures is defined as
> raw. When I query after building, I find that there are inconsistencies
> between parquet schema and spark schema. When building cube, the raw measure
> written to parquet is processed with spark max, and the datatype of Max is
> child Datatype, in my cube, child Datatype is decimal (19,4). However, when I
> query through SQL, raw is uniformly specified as binarytype in tablescanpaln.
> Therefore, I wonder if the structtype of raw in tablescanpaln also uses child
> dataType ?
> when build ,Raw type is child.dataType
> @see org.apache.kylin.engine.spark.job.CuboidAggregator
> {code:java}
> measure.expression.toUpperCase(Locale.ROOT) match {
> case "MAX" =>
> max(columns.head).as(id.toString)
> case "MIN" =>
> min(columns.head).as(id.toString)
> case "SUM" =>
> sum(columns.head).as(id.toString)
> case "COUNT" =>
> if (reuseLayout) {
> sum(columns.head).as(id.toString)
> } else {
> count(columns.head).as(id.toString)
> }
> case "COUNT_DISTINCT" =>
> // for test
> if (isSparkSql) {
> countDistinct(columns.head).as(id.toString)
> } else {
> val cdAggregate = getCountDistinctAggregate(columns,
> measure.returnType, reuseLayout)
> new Column(cdAggregate.toAggregateExpression()).as(id.toString)
> }
> case "TOP_N" =>
> // Uses new TopN aggregate function
> // located in
> kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
> val schema = StructType(measure.pra.map { col =>
> val dateType = col.dataType
> if (col == measure) {
> StructField(s"MEASURE_${col.columnName}", dateType)
> } else {
> StructField(s"DIMENSION_${col.columnName}", dateType)
> }
> })
> if (reuseLayout) {
> new Column(ReuseTopN(measure.returnType.precision, schema,
> columns.head.expr)
> .toAggregateExpression()).as(id.toString)
> } else {
> new Column(EncodeTopN(measure.returnType.precision, schema,
> columns.head.expr, columns.drop(1).map(_.expr))
> .toAggregateExpression()).as(id.toString)
> }
> case "PERCENTILE_APPROX" =>
> val udfName =
> UdfManager.register(measure.returnType.toKylinDataType, measure.expression,
> null, !reuseLayout)
> if (!reuseLayout) {
> callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
> } else {
> callUDF(udfName, columns.head).as(id.toString)
> }
> case _ =>
> max(columns.head).as(id.toString) // Raw matcher here,but max
> dataType is child.dataType
> }
> }.toSeq
> {code}
> But when query,Raw StructType is BinaryType.
> @see org.apache.kylin.query.runtime.plans.TableScanPlan
> ,org.apache.spark.sql.utils.SparkTypeUtil
> {code:java}
> def toSparkType(dataTp: DataType, isSum: Boolean = false):
> org.apache.spark.sql.types.DataType = {
> dataTp.getName match {
> // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
> case "decimal" =>
> if (isSum) {
> val i = dataTp.getPrecision + 10
> DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
> }
> else DecimalType(dataTp.getPrecision, dataTp.getScale)
> case "date" => DateType
> case "time" => DateType
> case "timestamp" => TimestampType
> case "datetime" => DateType
> case "tinyint" => if (isSum) LongType else ByteType
> case "smallint" => if (isSum) LongType else ShortType
> case "integer" => if (isSum) LongType else IntegerType
> case "int4" => if (isSum) LongType else IntegerType
> case "bigint" => LongType
> case "long8" => LongType
> case "float" => if (isSum) DoubleType else FloatType
> case "double" => DoubleType
> case tp if tp.startsWith("varchar") => StringType
> case tp if tp.startsWith("char") => StringType
> case "dim_dc" => LongType
> case "boolean" => BooleanType
> case tp if tp.startsWith("hllc") => BinaryType
> case tp if tp.startsWith("bitmap") => BinaryType
> case tp if tp.startsWith("extendedcolumn") => BinaryType
> case tp if tp.startsWith("percentile") => BinaryType
> case tp if tp.startsWith("raw") => BinaryType
> case _ => throw new IllegalArgumentException(dataTp.toString)
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)