Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212318460
--- Diff:
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType =>
CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType,
StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression =>
CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression =>
CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression,
FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType):
types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+
CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " +
dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression]
= {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name,
filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f =>
getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new
AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any):
CarbonExpression = {
+ val dataTypeOfAttribute =
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+ val dataType =
+ if (Option(value).isDefined &&
+ dataTypeOfAttribute == CarbonDataTypes.STRING &&
+ value.isInstanceOf[Double]) {
+ CarbonDataTypes.DOUBLE
+ } else {
+ dataTypeOfAttribute
+ }
+ new CarbonLiteralExpression(value, dataType)
+ }
+
+ createFilter(predicate)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while
deserializing it is
+ // not able find the classes inside scala list and gives
ClassNotFoundException.
+ private def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression]
= {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+
+ /**
+ * Create load model for carbon
+ */
+ def prepareLoadModel(options: Map[String, String],
+ dataSchema: StructType): CarbonLoadModel = {
+ val schema = new Schema(dataSchema.fields.map { field =>
+ field.dataType match {
+ case s: StructType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ s.fields
+ .map(f => new datatype.StructField(f.name,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+ case a: ArrayType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ Seq(new datatype.StructField(field.name,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+ case other =>
+ new Field(field.name, field.dataType.simpleString)
+ }
+ })
+ val builder = new CarbonWriterBuilder
+ builder.isTransactionalTable(false)
+ builder.outputPath(options.getOrElse("path", ""))
+ val blockSize =
options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+ if (blockSize.isDefined) {
+ builder.withBlockSize(blockSize.get)
+ }
+ val blockletSize = options.get("table_blockletsize").map(_.toInt)
+ if (blockletSize.isDefined) {
+ builder.withBlockletSize(blockletSize.get)
+ }
+
builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+ builder.localDictionaryThreshold(
+ options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+ builder.sortBy(
+
options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+ builder.isTransactionalTable(false)
+ builder.uniqueIdentifier(System.currentTimeMillis())
--- End diff --
Support column_meta_cache, cache_level also
---