Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212348045
  
    --- Diff: 
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
 ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => 
CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, 
StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => 
CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => 
CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, 
FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): 
types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        
CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + 
dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] 
= {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, 
filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => 
getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new 
AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    +      new CarbonColumnExpression(name,
    +        
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
    +    }
    +
    +    def getCarbonLiteralExpression(name: String, value: Any): 
CarbonExpression = {
    +      val dataTypeOfAttribute =
    +        
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
    +      val dataType =
    +        if (Option(value).isDefined &&
    +            dataTypeOfAttribute == CarbonDataTypes.STRING &&
    +            value.isInstanceOf[Double]) {
    +        CarbonDataTypes.DOUBLE
    +      } else {
    +        dataTypeOfAttribute
    +      }
    +      new CarbonLiteralExpression(value, dataType)
    +    }
    +
    +    createFilter(predicate)
    +  }
    +
    +  // Convert scala list to java list, Cannot use scalaList.asJava as while 
deserializing it is
    +  // not able find the classes inside scala list and gives 
ClassNotFoundException.
    +  private def convertToJavaList(
    +      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] 
= {
    +    val javaList = new java.util.ArrayList[CarbonExpression]()
    +    scalaList.foreach(javaList.add)
    +    javaList
    +  }
    +
    +  /**
    +   * Create load model for carbon
    +   */
    +  def prepareLoadModel(options: Map[String, String],
    +      dataSchema: StructType): CarbonLoadModel = {
    +    val schema = new Schema(dataSchema.fields.map { field =>
    +      field.dataType match {
    +        case s: StructType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            s.fields
    +              .map(f => new datatype.StructField(f.name,
    +                
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
    +        case a: ArrayType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            Seq(new datatype.StructField(field.name,
    +              
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
    +        case other =>
    +          new Field(field.name, field.dataType.simpleString)
    +      }
    +    })
    +    val builder = new CarbonWriterBuilder
    +    builder.isTransactionalTable(false)
    +    builder.outputPath(options.getOrElse("path", ""))
    +    val blockSize = 
options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
    +    if (blockSize.isDefined) {
    +      builder.withBlockSize(blockSize.get)
    +    }
    +    val blockletSize = options.get("table_blockletsize").map(_.toInt)
    +    if (blockletSize.isDefined) {
    +      builder.withBlockletSize(blockletSize.get)
    +    }
    +    
builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    +      CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
    +    builder.localDictionaryThreshold(
    +      options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
    +        CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
    +    builder.sortBy(
    +      
options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
    +    builder.isTransactionalTable(false)
    +    builder.uniqueIdentifier(System.currentTimeMillis())
    --- End diff --
    
    ok


---

Reply via email to