This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b1dd0  [CARBONDATA-4167][CARBONDATA-4168] Fix case sensitive issues 
and input validation for Geo values.
e5b1dd0 is described below

commit e5b1dd06e77d89c1c0e77e471f12058f4fdca0a3
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Apr 6 14:12:40 2021 +0530

    [CARBONDATA-4167][CARBONDATA-4168] Fix case sensitive issues and input 
validation for Geo values.
    
    Why is this PR needed?
    1. SPATIAL_INDEX property, POLYGON, LINESTRING, and RANGELIST UDF's are 
case sensitive.
    2. SPATIAL_INDEX.xx.gridSize and SPATIAL_INDEX.xxx.conversionRatio is 
accepting negative values.
    3. Accepting invalid values in geo UDF's.
    
    What changes were proposed in this PR?
    1. converted properties to lower case and made UDF's case insensitive.
    2. added validation.
    3. refactored readAllIIndexOfSegment
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4118
---
 .../blockletindex/SegmentIndexFileStore.java       | 28 ++--------
 .../core/writer/CarbonIndexFileMergeWriter.java    |  3 +-
 .../org/apache/carbondata/geo/GeoConstants.java    |  5 ++
 .../org/apache/carbondata/geo/GeoHashIndex.java    | 12 +----
 .../org/apache/carbondata/geo/GeoHashUtils.java    | 38 ++++++++++---
 .../geo/scan/expression/PolygonListExpression.java | 10 +++-
 .../expression/PolygonRangeListExpression.java     | 10 +++-
 .../scan/expression/PolylineListExpression.java    |  5 +-
 .../org/apache/carbondata/geo/GeoUtilUDFs.scala    | 29 +++++-----
 .../org/apache/carbondata/geo/InPolygonUDF.scala   |  4 +-
 .../table/CarbonDescribeFormattedCommand.scala     | 12 +++++
 .../execution/strategy/CarbonSourceStrategy.scala  | 41 +++++++++++++-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |  7 +++
 .../spark/sql/parser/CarbonSparkSqlParser.scala    |  9 +++-
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |  2 +-
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 62 ++++++++++++++++++++--
 16 files changed, 208 insertions(+), 69 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 60c8981..3c41ea9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -197,12 +197,11 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readAllIIndexOfSegment(CarbonFile[] carbonFiles) throws 
IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(carbonFiles);
-    for (CarbonFile carbonIndexFile : carbonIndexFiles) {
-      if 
(carbonIndexFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        readMergeFile(carbonIndexFile.getCanonicalPath());
-      } else if 
(carbonIndexFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-        readIndexFile(carbonIndexFile);
+    for (CarbonFile carbonFile : carbonFiles) {
+      if (carbonFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
{
+        readMergeFile(carbonFile.getCanonicalPath());
+      } else if 
(carbonFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        readIndexFile(carbonFile);
       }
     }
   }
@@ -391,23 +390,6 @@ public class SegmentIndexFileStore {
   }
 
   /**
-   * List all the index files of the segment.
-   *
-   * @param carbonFiles
-   * @return
-   */
-  public static CarbonFile[] getCarbonIndexFiles(CarbonFile[] carbonFiles) {
-    List<CarbonFile> indexFiles = new ArrayList<>();
-    for (CarbonFile file: carbonFiles) {
-      if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
-          file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        indexFiles.add(file);
-      }
-    }
-    return indexFiles.toArray(new CarbonFile[indexFiles.size()]);
-  }
-
-  /**
    * Return the map that contain index file name and content of the file.
    *
    * @return
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index a619f15..2c455d6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -200,13 +200,12 @@ public class CarbonIndexFileMergeWriter {
       if (readBasedOnUUID) {
         indexFiles = SegmentIndexFileStore
             .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), 
uuid);
-        fileStore.readAllIIndexOfSegment(segmentPath, uuid);
       } else {
         // The uuid can be different, when we add load from external path.
         indexFiles =
             SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, 
FileFactory.getConfiguration());
-        fileStore.readAllIIndexOfSegment(segmentPath);
       }
+      fileStore.readAllIIndexOfSegment(indexFiles);
     }
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
     Map<String, List<String>> mergeToIndexFileMap = 
fileStore.getCarbonMergeFileToIndexFilesMap();
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
index 2b1b6b7..0520f1e 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
@@ -27,6 +27,9 @@ public class GeoConstants {
   // GeoHash type Spatial Index
   public static final String GEOHASH = "geohash";
 
+  // Regular expression to validate whether the input value is positive integer
+  public static final String POSITIVE_INTEGER_REGEX = "^[+]?\\d*[1-9]\\d*$";
+
   // Regular expression to parse input polygons for IN_POLYGON_LIST
   public static final String POLYGON_REG_EXPRESSION = "(?<=POLYGON 
\\(\\()(.*?)(?=(\\)\\)))";
 
@@ -36,6 +39,8 @@ public class GeoConstants {
   // Regular expression to parse input rangelists for IN_POLYGON_RANGE_LIST
   public static final String RANGELIST_REG_EXPRESSION = "(?<=RANGELIST 
\\()(.*?)(?=\\))";
 
+  public static final String GRID_SIZE = "gridSize";
+
   // delimiter of input points or ranges
   public static final String DEFAULT_DELIMITER = ",";
 
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
index 4a5e892..b022cba 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
@@ -136,18 +136,10 @@ public class GeoHashIndex extends 
CustomIndex<List<Long[]>> {
     }
     String GRID_SIZE = commonKey + "gridsize";
     String gridSize = properties.get(GRID_SIZE);
-    if (StringUtils.isEmpty(gridSize)) {
-      throw new MalformedCarbonCommandException(
-              String.format("%s property is invalid. %s property must be 
specified.",
-                      CarbonCommonConstants.SPATIAL_INDEX, GRID_SIZE));
-    }
+    GeoHashUtils.validateGeoProperty(gridSize, GRID_SIZE);
     String CONVERSION_RATIO = commonKey + "conversionratio";
     String conversionRatio = properties.get(CONVERSION_RATIO);
-    if (StringUtils.isEmpty(conversionRatio)) {
-      throw new MalformedCarbonCommandException(
-              String.format("%s property is invalid. %s property must be 
specified.",
-                      CarbonCommonConstants.SPATIAL_INDEX, CONVERSION_RATIO));
-    }
+    GeoHashUtils.validateGeoProperty(conversionRatio, CONVERSION_RATIO);
 
     // Fill the values to the instance fields
     this.oriLatitude = Double.valueOf(originLatitude);
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
index e5e7a78..09baf5c 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
@@ -23,6 +23,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.regex.Pattern;
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import static org.apache.carbondata.geo.GeoConstants.POSITIVE_INTEGER_REGEX;
+
+import org.apache.commons.lang3.StringUtils;
 
 public class GeoHashUtils {
 
@@ -83,6 +91,28 @@ public class GeoHashUtils {
     return colRow2GeoID(ij[0], ij[1]);
   }
 
+  public static void validateUDFInputValue(Object input, String inputName, 
String datatype)
+      throws MalformedCarbonCommandException {
+    if (inputName.equalsIgnoreCase(GeoConstants.GRID_SIZE) && (input == null
+        || 
!Pattern.compile(POSITIVE_INTEGER_REGEX).matcher(input.toString()).find())) {
+      throw new MalformedCarbonCommandException("Expect grid size to be a 
positive integer");
+    } else if (input == null || input.toString().equals("null")) {
+      throw new MalformedCarbonCommandException(
+          "Expect " + inputName + " to be of " + datatype + " type");
+    }
+  }
+
+  public static void validateGeoProperty(String propertyValue, String 
propertyName)
+      throws MalformedCarbonCommandException {
+    if (StringUtils.isEmpty(propertyValue) ||
+        
!Pattern.compile(POSITIVE_INTEGER_REGEX).matcher(propertyValue).find()) {
+      throw new MalformedCarbonCommandException(
+          String.format("%s property is invalid. %s property must be 
specified, "
+                  + "and the value must be positive integer.",
+              CarbonCommonConstants.SPATIAL_INDEX, propertyName));
+    }
+  }
+
   /**
    * Calculate geo id through grid index coordinates, the row and column of 
grid coordinates
    * can be transformed by latitude and longitude
@@ -264,12 +294,8 @@ public class GeoHashUtils {
    * @return geoId range list of processed set
    */
   public static List<Long[]> processRangeList(List<Long[]> rangeListA, 
List<Long[]> rangeListB,
-      String opType) {
+      GeoOperationType operationType) {
     List<Long[]> processedRangeList;
-    GeoOperationType operationType = GeoOperationType.getEnum(opType);
-    if (operationType == null) {
-      throw new RuntimeException("Unsupported operation type " + opType);
-    }
     switch (operationType) {
       case OR:
         processedRangeList = getPolygonUnion(rangeListA, rangeListB);
@@ -278,7 +304,7 @@ public class GeoHashUtils {
         processedRangeList = getPolygonIntersection(rangeListA, rangeListB);
         break;
       default:
-        throw new RuntimeException("Unsupported operation type " + opType);
+        throw new RuntimeException("Unsupported operation type " + 
operationType.toString());
     }
     return processedRangeList;
   }
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
index 903b694..3250b38 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
@@ -26,6 +26,7 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.util.CustomIndex;
 import org.apache.carbondata.geo.GeoConstants;
 import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
 
 /**
  * InPolygonList expression processor. It inputs the InPolygonList string to 
the Geo
@@ -49,7 +50,8 @@ public class PolygonListExpression extends PolygonExpression {
     try {
       // 1. parse the polygon list string
       List<String> polygons = new ArrayList<>();
-      Pattern pattern = Pattern.compile(GeoConstants.POLYGON_REG_EXPRESSION);
+      Pattern pattern =
+          Pattern.compile(GeoConstants.POLYGON_REG_EXPRESSION, 
Pattern.CASE_INSENSITIVE);
       Matcher matcher = pattern.matcher(polygon);
       while (matcher.find()) {
         String matchedStr = matcher.group();
@@ -62,11 +64,15 @@ public class PolygonListExpression extends 
PolygonExpression {
       // 2. get the range list of each polygon
       List<Long[]> processedRangeList = instance.query(polygons.get(0));
       GeoHashUtils.validateRangeList(processedRangeList);
+      GeoOperationType operationType = GeoOperationType.getEnum(opType);
+      if (operationType == null) {
+        throw new RuntimeException("Unsupported operation type " + opType);
+      }
       for (int i = 1; i < polygons.size(); i++) {
         List<Long[]> tempRangeList = instance.query(polygons.get(i));
         GeoHashUtils.validateRangeList(tempRangeList);
         processedRangeList = GeoHashUtils.processRangeList(
-            processedRangeList, tempRangeList, opType);
+            processedRangeList, tempRangeList, operationType);
       }
       ranges = processedRangeList;
     } catch (Exception e) {
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
index 81efe05..704128d 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
@@ -27,6 +27,7 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.util.CustomIndex;
 import org.apache.carbondata.geo.GeoConstants;
 import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
 
 /**
  * InPolygonRangeList expression processor. It inputs the InPolygonRangeList 
string to
@@ -49,7 +50,8 @@ public class PolygonRangeListExpression extends 
PolygonExpression {
   public void processExpression() {
     // 1. parse the range list string
     List<String> rangeLists = new ArrayList<>();
-    Pattern pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION);
+    Pattern pattern =
+        Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, 
Pattern.CASE_INSENSITIVE);
     Matcher matcher = pattern.matcher(polygon);
     while (matcher.find()) {
       String matchedStr = matcher.group();
@@ -57,11 +59,15 @@ public class PolygonRangeListExpression extends 
PolygonExpression {
     }
     // 2. process the range lists
     if (rangeLists.size() > 0) {
+      GeoOperationType operationType = GeoOperationType.getEnum(opType);
+      if (operationType == null) {
+        throw new RuntimeException("Unsupported operation type " + opType);
+      }
       List<Long[]> processedRangeList = 
getRangeListFromString(rangeLists.get(0));
       for (int i = 1; i < rangeLists.size(); i++) {
         List<Long[]> tempRangeList = getRangeListFromString(rangeLists.get(i));
         processedRangeList = GeoHashUtils.processRangeList(
-            processedRangeList, tempRangeList, opType);
+            processedRangeList, tempRangeList, operationType);
       }
       ranges = processedRangeList;
     }
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
index 70ea7f3..61fcb3d 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
@@ -61,7 +61,8 @@ public class PolylineListExpression extends PolygonExpression 
{
       // 1. parse the polyline list string and get polygon from each polyline
       List<Geometry> polygonList = new ArrayList<>();
       WKTReader wktReader = new WKTReader();
-      Pattern pattern = Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION);
+      Pattern pattern =
+          Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION, 
Pattern.CASE_INSENSITIVE);
       Matcher matcher = pattern.matcher(polygon);
       while (matcher.find()) {
         String matchedStr = matcher.group();
@@ -80,7 +81,7 @@ public class PolylineListExpression extends PolygonExpression 
{
           List<Long[]> tempRangeList = instance.query(tempPointList);
           GeoHashUtils.validateRangeList(tempRangeList);
           processedRangeList = GeoHashUtils.processRangeList(
-            processedRangeList, tempRangeList, GeoOperationType.OR.toString());
+            processedRangeList, tempRangeList, GeoOperationType.OR);
         }
         ranges = processedRangeList;
       }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
index a2e7a42..e25768a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
@@ -33,34 +33,39 @@ object GeoUtilUDFs {
   }
 }
 
-class GeoIdToGridXyUDF extends (Long => Array[Int]) with Serializable {
-  override def apply(geoId: Long): Array[Int] = {
+class GeoIdToGridXyUDF extends (java.lang.Long => Array[Int]) with 
Serializable {
+  override def apply(geoId: java.lang.Long): Array[Int] = {
     GeoHashUtils.geoID2ColRow(geoId)
   }
 }
 
-class GeoIdToLatLngUDF extends ((Long, Double, Int) => Array[Double]) with 
Serializable {
-  override def apply(geoId: Long, oriLatitude: Double, gridSize: Int): 
Array[Double] = {
+class GeoIdToLatLngUDF
+  extends ((java.lang.Long, java.lang.Double, java.lang.Integer) => 
Array[Double]) with
+    Serializable {
+  override def apply(geoId: java.lang.Long, oriLatitude: java.lang.Double,
+      gridSize: java.lang.Integer): Array[Double] = {
     GeoHashUtils.geoID2LatLng(geoId, oriLatitude, gridSize)
   }
 }
 
-class LatLngToGeoIdUDF extends ((Long, Long, Double, Int) => Long) with 
Serializable {
-  override def apply(latitude: Long, longitude: Long, oriLatitude: Double, 
gridSize: Int): Long = {
+class LatLngToGeoIdUDF extends ((java.lang.Long, java.lang.Long,
+  java.lang.Double, java.lang.Integer) => Long) with Serializable {
+  override def apply(latitude: java.lang.Long, longitude: java.lang.Long,
+      oriLatitude: java.lang.Double, gridSize: java.lang.Integer): Long = {
     GeoHashUtils.lonLat2GeoID(longitude, latitude, oriLatitude, gridSize)
   }
 }
 
-class ToUpperLayerGeoIdUDF extends (Long => Long) with Serializable {
-  override def apply(geoId: Long): Long = {
+class ToUpperLayerGeoIdUDF extends (java.lang.Long => Long) with Serializable {
+  override def apply(geoId: java.lang.Long): Long = {
     GeoHashUtils.convertToUpperLayerGeoId(geoId)
   }
 }
 
-class ToRangeListUDF extends ((String, Double, Int) => 
mutable.Buffer[Array[Long]])
-  with Serializable {
-  override def apply(polygon: String, oriLatitude: Double,
-                     gridSize: Int): mutable.Buffer[Array[Long]] = {
+class ToRangeListUDF extends ((java.lang.String, java.lang.Double, 
java.lang.Integer) =>
+  mutable.Buffer[Array[Long]]) with Serializable {
+  override def apply(polygon: java.lang.String, oriLatitude: java.lang.Double,
+      gridSize: java.lang.Integer): mutable.Buffer[Array[Long]] = {
     GeoHashUtils.getRangeList(polygon, oriLatitude, 
gridSize).asScala.map(_.map(Long2long))
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index ff618b5..8bed2fb 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -46,8 +46,8 @@ class InPolygonListUDF extends ((String, String) => Boolean) 
with Serializable {
 }
 
 @InterfaceAudience.Internal
-class InPolylineListUDF extends ((String, Float) => Boolean) with Serializable 
{
-  override def apply(v1: String, v2: Float): Boolean = {
+class InPolylineListUDF extends ((String, java.lang.Float) => Boolean) with 
Serializable {
+  override def apply(v1: String, v2: java.lang.Float): Boolean = {
     true // Carbon applies the filter. So, Spark do not have to apply filter.
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 64ac71e..505c481 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -78,6 +78,18 @@ private[sql] case class CarbonDescribeFormattedCommand(
               tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.datatype"), ""),
             ("Sources Columns",
               tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.sourcecolumns"), ""))
+          if 
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.originlatitude"))
 {
+            results ++= Seq(("Origin Latitude",
+              
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.originlatitude"), ""))
+          }
+          if 
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.gridsize")) {
+            results ++= Seq(("Grid Size",
+              
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.gridsize"), ""))
+          }
+          if 
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.conversionratio"))
 {
+            results ++= Seq(("Conversion Ratio",
+              
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.conversionratio"), ""))
+          }
           if (indexList.length != count) {
             results ++= Seq(("", "", ""))
           }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index be8b0e3..fdc3eaa 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.scan.expression.{Expression => CarbonFilter}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.geo.{InPolygonListUDF, InPolygonRangeListUDF, 
InPolygonUDF, InPolylineListUDF}
+import org.apache.carbondata.geo.{GeoHashUtils, GeoIdToGridXyUDF, 
GeoIdToLatLngUDF, InPolygonListUDF, InPolygonRangeListUDF, InPolygonUDF, 
InPolylineListUDF, LatLngToGeoIdUDF, ToRangeListUDF, ToUpperLayerGeoIdUDF}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.index.{TextMatchMaxDocUDF, TextMatchUDF}
 
@@ -62,10 +62,49 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
         } catch {
           case _: CarbonPhysicalPlanException => Nil
         }
+      case Project(projectList, _: OneRowRelation) if validateUdf(projectList) 
=> Nil
       case _ => Nil
     }
   }
 
+  private def validateUdf(projects: Seq[NamedExpression]): Boolean = {
+    projects foreach {
+      case alias: Alias if alias.child.isInstanceOf[Expression] =>
+        alias.child match {
+          case Cast(s: ScalaUDF, _, _) => validateGeoUtilUDFs(s)
+          case s: ScalaUDF => validateGeoUtilUDFs(s)
+          case _ =>
+        }
+    }
+    true
+  }
+
+  def validateGeoUtilUDFs(s: ScalaUDF): Boolean = {
+    s.function match {
+      case _: ToRangeListUDF =>
+        val inputNames = List("polygon", "oriLatitude", "gridSize")
+        for (i <- s.children.indices) {
+          GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i), 
s.inputTypes(i).typeName)
+        }
+      case _: LatLngToGeoIdUDF =>
+        val inputNames = List("latitude", "longitude", "oriLatitude", 
"gridSize")
+        for (i <- s.children.indices) {
+          GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i), 
s.inputTypes(i).typeName)
+        }
+      case _: GeoIdToGridXyUDF =>
+        GeoHashUtils.validateUDFInputValue(s.children.head, "geoId", 
s.inputTypes.head.typeName)
+      case _: GeoIdToLatLngUDF =>
+        val inputNames = List("geoId", "oriLatitude", "gridSize")
+        for (i <- s.children.indices) {
+          GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i), 
s.inputTypes(i).typeName)
+        }
+      case _: ToUpperLayerGeoIdUDF =>
+        GeoHashUtils.validateUDFInputValue(s.children.head, "geoId", 
s.inputTypes.head.typeName)
+      case _ =>
+    }
+    true
+  }
+
   private def isCarbonRelation(logicalRelation: LogicalRelation): Boolean = {
     logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 8bd680e..3e30a97 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -243,6 +243,13 @@ object CarbonFilters {
           throw new MalformedCarbonCommandException("Expect two string in 
polyline list")
         }
         val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
+        if (scala.util.Try(children.last.toString().toFloat).isFailure) {
+          throw new MalformedCarbonCommandException("Expect buffer size to be 
of float type")
+        }
+        val bufferSize = children.last.toString().toFloat
+        if (bufferSize <= 0) {
+          throw new MalformedCarbonCommandException("Expect buffer size to be 
a positive value")
+        }
         Some(new PolylineListExpression(children.head.toString(),
           children.last.toString().toFloat, columnName, instance))
       case _: InPolygonRangeListUDF =>
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 82d24f9..50ad29e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import 
org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.needToConvertToLowerCase
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
@@ -128,8 +129,14 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
 
     val tableProperties = mutable.Map[String, String]()
     val properties: Map[String, String] = 
getPropertyKeyValues(tablePropertyList)
-    properties.foreach{property => tableProperties.put(property._1, 
property._2)}
 
+    properties.foreach { property =>
+      if (needToConvertToLowerCase(property._1)) {
+        tableProperties.put(property._1.toLowerCase, property._2.toLowerCase)
+      } else {
+        tableProperties.put(property._1.toLowerCase, property._2)
+      }
+    }
     // validate partition clause
     val partitionByStructFields = 
Option(partitionColumns).toSeq.flatMap(visitColTypeList)
     val partitionFields = CarbonSparkSqlParserUtil.
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 35ef69c..54749c9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -564,7 +564,7 @@ object CarbonSparkSqlParserUtil {
   def needToConvertToLowerCase(key: String): Boolean = {
     var noConvertList = Array(CarbonCommonConstants.COMPRESSOR, "PATH", 
"bad_record_path",
       "timestampformat", "dateformat")
-    if (key.startsWith(CarbonCommonConstants.SPATIAL_INDEX) && 
key.endsWith(".class")) {
+    if (key.toLowerCase.startsWith(CarbonCommonConstants.SPATIAL_INDEX) && 
key.endsWith(".class")) {
       noConvertList = noConvertList ++ Array(key)
     }
     !noConvertList.exists(x => x.equalsIgnoreCase(key))
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala 
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 2ea56db..628d690 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.geo
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -149,6 +150,56 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
       s"the option(s): column_meta_cache"))
   }
 
+  test("test UDF's with invalid values") {
+    createTable()
+    val exception1 = intercept[RuntimeException](sql(
+      s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+      s"'RANGELIST (855279368848 855279368850, 855279368849 855279368852)', 
45)").collect())
+    assert(exception1.getMessage.contains("Unsupported operation type 45"))
+
+    var exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+          s"'linestring (120.184179 30.327465, 120.191603 30.328946, 
120.199242 30.324464, " +
+          s"120.190359 30.315388)', 'x')").collect())
+    assert(exception2.getMessage.contains("Expect buffer size to be of float 
type"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+          s"'linestring (120.184179 30.327465, 120.191603 30.328946, 
120.199242 30.324464, " +
+          s"120.190359 30.315388)', -1)").collect())
+    assert(exception2.getMessage.contains("Expect buffer size to be a positive 
value"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select LatLngToGeoId(39930753, 116302895, 39.832277, -50) as 
geoId").collect())
+    assert(exception2.getMessage.contains("Expect grid size to be a positive 
integer"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select GeoIdToLatLng(855279270226, 39.832277, -50) as 
LatitudeAndLongitude").collect())
+    assert(exception2.getMessage.contains("Expect grid size to be a positive 
integer"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select ToRangeList('116.321011 40.123503, 116.320311 
40.122503,116.321111 40.121503, " +
+          s"116.321011 40.123503', 39.832277, 0) as rangeList")
+        .collect())
+    assert(exception2.getMessage.contains("Expect grid size to be a positive 
integer"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select GeoIdToGridXy('X') as GridXY").collect())
+    assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select LatLngToGeoId('X', 'X', 'X', 'X') as geoId").collect())
+    assert(exception2.getMessage.contains("Expect latitude to be of long 
type"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select GeoIdToLatLng('X', 'X', 'X') as 
LatitudeAndLongitude").collect())
+    assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+
+    exception2 = intercept[MalformedCarbonCommandException](
+      sql(s"select ToUpperLayerGeoId('X') as upperLayerGeoId").collect())
+    assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+  }
+
   test("test materialized view with spatial column") {
     createTable()
     val exception = intercept[MalformedCarbonCommandException](sql(
@@ -502,7 +553,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
         Row(120196020, 30321651)))
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
-        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
+        s"'polygon ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
         s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
         s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 
30.321464," +
         s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " 
+
@@ -605,8 +656,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
         Row(120198638, 30323540)))
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 
30.324464), " +
-        s"LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+        s"'linestring (120.184179 30.327465, 120.191603 30.328946, 120.199242 
30.324464), " +
+        s"linestring (120.199242 30.324464, 120.190359 30.315388)', 65)"),
       Seq(Row(120184976, 30327105),
         Row(120197093, 30325985),
         Row(120196020, 30321651),
@@ -660,7 +711,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
         Row(855282157702L, 116325378, 39963129)))
     checkAnswer(
       sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"'rangelist (855279368850 855279368852, 855280799610 855280799612, " +
         s"855282156300 855282157400), " +
         s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
         s"855282156301 855282157800)', " +
@@ -838,7 +889,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
            | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
            | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
            | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000',
+           | 
'SPATIAL_INDEX.mygeohash.class'='org.apache.carbondata.geo.GeoHashIndex')
        """.stripMargin)
   }
 

Reply via email to