Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212314571
  
    --- Diff: 
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
 ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => 
CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, 
StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => 
CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => 
CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, 
FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): 
types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        
CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + 
dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] 
= {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, 
filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => 
getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new 
AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    --- End diff --
    
    Remove duplicate methods (CarbonFilters also has)


---

Reply via email to