Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212314571
--- Diff:
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType =>
CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType,
StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression =>
CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression =>
CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression,
FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType):
types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+
CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " +
dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression]
= {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name,
filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f =>
getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new
AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
--- End diff --
Remove duplicate methods (CarbonFilters also has)
---