This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5971417  [CARBONDATA-4051] Geo spatial index algorithm improvement and 
UDFs enhancement
5971417 is described below

commit 5971417bfd979fd284882d0af32fc8adf6e550fa
Author: shenjiayu17 <[email protected]>
AuthorDate: Thu Nov 19 15:30:24 2020 +0800

    [CARBONDATA-4051] Geo spatial index algorithm improvement and UDFs 
enhancement
    
    Why is this PR needed?
    Spatial index feature optimization of CarbonData
    
    What changes were proposed in this PR?
    1. Update spatial index encoded algorithm, which can reduce the required 
properties of creating geo table
    2. Enhance geo query UDFs, support querying geo table with polygon list, 
polyline list, geoId range list. And add some geo transforming util UDFs.
    3. Load data (include LOAD and INSERT INTO) allows user to input spatial 
index, which column will still generated internally when user does not give.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4012
---
 .../apache/carbondata/core/util/CustomIndex.java   |   8 +
 docs/images/spatial-index-polygonlist.png          | Bin 0 -> 43631 bytes
 docs/images/spatial-index-polylinelist.png         | Bin 0 -> 20858 bytes
 docs/images/spatial-index-rangelist.png            | Bin 0 -> 64776 bytes
 docs/spatial-index-guide.md                        |  81 +++-
 .../org/apache/carbondata/geo/GeoConstants.java    |  30 ++
 .../org/apache/carbondata/geo/GeoHashIndex.java    | 248 +++-------
 .../org/apache/carbondata/geo/GeoHashUtils.java    | 409 ++++++++++++++++
 .../{GeoConstants.java => GeoOperationType.java}   |  28 +-
 .../org/apache/carbondata/geo/QuadTreeCls.java     |  80 ++--
 .../geo/scan/expression/PolygonExpression.java     |  45 +-
 .../geo/scan/expression/PolygonListExpression.java |  81 ++++
 .../expression/PolygonRangeListExpression.java     | 124 +++++
 .../scan/expression/PolylineListExpression.java    | 105 +++++
 .../filter/executor/PolygonFilterExecutorImpl.java |   9 +-
 .../apache/carbondata/geo/GeoHashUtilsTest.java    | 266 +++++++++++
 .../org/apache/carbondata/geo/QuadTreeClsTest.java |  47 +-
 .../org/apache/carbondata/geo/GeoUdfRegister.scala |  49 ++
 .../org/apache/carbondata/geo/GeoUtilUDFs.scala    |  66 +++
 .../org/apache/carbondata/geo/InPolygonUDF.scala   |  46 ++
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   9 +-
 .../strategy/CarbonLateDecodeStrategy.scala        |  25 +-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |  15 +-
 integration/spark/src/test/resources/geodata2.csv  |  31 ++
 .../resources/geodataWithCorrectSpatialIndex.csv   |  17 +
 .../resources/geodataWithErrorSpatialIndex.csv     |  17 +
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 514 +++++++++++++++++++--
 .../impl/SpatialIndexFieldConverterImpl.java       |   4 +-
 .../loading/parser/impl/RowParserImpl.java         |   4 +-
 .../InputProcessorStepWithNoConverterImpl.java     |   8 -
 30 files changed, 2006 insertions(+), 360 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java 
b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
index fb60051..0effbc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
@@ -60,6 +60,14 @@ public abstract class CustomIndex<ReturnType> implements 
Serializable {
   public abstract ReturnType query(String query) throws Exception;
 
   /**
+   * Query processor for custom index.
+   * @param queryPointList query point list for GeoHashIndex
+   * @return Returns list of ranges to be fetched
+   * @throws Exception
+   */
+  public abstract ReturnType query(List<double[]> queryPointList) throws 
Exception;
+
+  /**
    * Deserializes and returns the custom index instance
    * @param serializedInstance
    * @return
diff --git a/docs/images/spatial-index-polygonlist.png 
b/docs/images/spatial-index-polygonlist.png
new file mode 100644
index 0000000..8683af2
Binary files /dev/null and b/docs/images/spatial-index-polygonlist.png differ
diff --git a/docs/images/spatial-index-polylinelist.png 
b/docs/images/spatial-index-polylinelist.png
new file mode 100644
index 0000000..3aa7b71
Binary files /dev/null and b/docs/images/spatial-index-polylinelist.png differ
diff --git a/docs/images/spatial-index-rangelist.png 
b/docs/images/spatial-index-rangelist.png
new file mode 100644
index 0000000..d6682c4
Binary files /dev/null and b/docs/images/spatial-index-rangelist.png differ
diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index 0f9ddcc..60bc4ba 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -32,12 +32,28 @@ Below figure shows the relationship between the grid and 
the points residing in
 ![File Directory Structure](../docs/images/spatial-index-1.png?raw=true)
 
 Carbon supports Polygon User Defined Function(UDF) as filter condition in the 
query to return all the data points lying within it. Polygon UDF takes multiple 
points(i.e., pair of longitude and latitude) separated by a comma. Longitude 
and latitude in the pair are separated by a space. The first and last points in 
the polygon must be same to form a closed loop. CarbonData builds a quad tree 
using this polygon and spatial region information passed while creating a 
table. The nodes in the q [...]
-The main reasons for faster query response are as follows :
-* Data is sorted based on the index values.
-* Polygon UDF filter is pushed down from engine to the carbon layer such that 
CarbonData scans only matched blocklets avoiding full scan.
 
 ![File Directory Structure](../docs/images/spatial-index-2.png?raw=true)
 
+There are some other UDFs supporting more filter conditions in the query, 
including Polygon List, Polyline List, and spatial index range list.
+ 
+Polygon List UDF takes multiple polygons(i.e., a set of points) and operation 
type for combining polygons. Only `OR` and `AND` are supported at present, 
operation 'OR' means union of multiple polygons and 'AND' means intersection of 
that, shown as the following figure. Then CarbonData gets the list of range of 
indices from the combined region by quad tree, which is the same processing as 
Polygon UDF.
+
+![File Directory 
Structure](../docs/images/spatial-index-polygonlist.png?raw=true)
+
+Polyline List UDF takes multiple polylines(i.e., a set of points) and buffer 
in meter. CarbonData first converts polyline to polygon and then gets the list 
of range of indices from these polygons. The processing is the same as Polygon 
UDF and return all the data points lying within the buffer region of polylines.
+
+![File Directory 
Structure](../docs/images/spatial-index-polylinelist.png?raw=true)
+
+Polygon Range List UDF takes multiple range lists and operation type for 
merging the range lists. **Range** is an area bounded by start spatial index 
and end spatial index(i.e., minimum index and maximum index of range) in a quad 
tree. **Range List** is internal representation of a range definition that may 
contains one or multiple polygons. Operation includes `OR` and `AND` at 
present, means the union and intersection set of multiple range lists. This UDF 
returns all the data points who [...]
+
+![File Directory 
Structure](../docs/images/spatial-index-rangelist.png?raw=true)
+
+The main reasons for faster query response are as follows :
+* Data is sorted based on the index values.
+* Above UDF filter is pushed down from engine to the carbon layer such that 
CarbonData scans only matched blocklets avoiding full scan.
+
+Beside, CarbonData also provides some spatial conversion utils UDFs. Such as 
converting spatial index to spatial grid coordinate x,y, converting spatial 
index to longitude and latitude pair, converting longitude and latitude pair to 
spatial index, converting spatial index to upper layer spatial index of pyramid 
model, and converting input polygon string to list of range of indices.
 
 # Installation and Deployment
 
@@ -56,10 +72,6 @@ create table source_index(id BIGINT, latitude long, 
longitude long) stored by 'c
 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
 'SPATIAL_INDEX.mygeohash.originLatitude'='19.832277',
 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-'SPATIAL_INDEX.mygeohash.minLongitude'='1.811865',
-'SPATIAL_INDEX.mygeohash.maxLongitude'='2.782233',
-'SPATIAL_INDEX.mygeohash.minLatitude'='19.832277',
-'SPATIAL_INDEX.mygeohash.maxLatitude'='20.225281',
 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000');
 ```
 Note: 
@@ -74,11 +86,8 @@ Note:
 | SPATIAL_INDEX | Used to configure Spatial Index name. This name is appended 
to `SPATIAL_INDEX` in the subsequent sub-property configurations. `xxx` in the 
below sub-properties refer to index name. Generated spatial index column is not 
allowed in any properties except in `SORT_COLUMNS` table property.|
 | SPATIAL_INDEX.xxx.type | Type of algorithm for processing spatial data. 
Currently, supports only 'geohash'.|
 | SPATIAL_INDEX.xxx.sourcecolumns | longitude and latitude column names as in 
the table. These columns are used to generate index value for each row.|
+| SPATIAL_INDEX.xxx.originLatitude | Latitude of origin.|
 | SPATIAL_INDEX.xxx.gridSize | Grid size of raster data in metres. Currently, 
spatial index supports raster data.|
-| SPATIAL_INDEX.xxx.minLongitude | Minimum longitude of the gridded 
rectangular area.|
-| SPATIAL_INDEX.xxx.maxLongitude | Maximum longitude of the gridded 
rectangular area.|
-| SPATIAL_INDEX.xxx.minLatitude | Minimum latitude of the gridded rectangular 
area.|
-| SPATIAL_INDEX.xxx.maxLatitude | Maximum latitude of the gridded rectangular 
area.|
 | SPATIAL_INDEX.xxx.conversionRatio | Conversion factor. It allows user to 
translate longitude and latitude to long. For example, if the data to load is 
longitude = 13.123456, latitude = 101.12356. User can configure conversion 
ratio sub-property value as 1000000, and change data to load as longitude = 
13123456 and latitude = 10112356. Operations on long is much faster compared to 
floating-point numbers.|
 | SPATIAL_INDEX.xxx.class | Optional user custom implementation class. Value 
is fully qualified class name.|
 
@@ -91,6 +100,56 @@ Query with Polygon UDF predicate
 select * from source_index where IN_POLYGON('16.321011 4.123503,16.137676 
5.947911,16.560993 5.935276,16.321011 4.123503')
 ```
 
+Query with Polygon List UDF predicate
+
+```
+select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 
40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), 
POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 
116.560993 39.935276))', 'OR')
+```
+
+Query with Polyline List UDF predicate
+
+```
+select * from source_index where IN_POLYLINE_LIST('LINESTRING (116.137676 
40.163503, 116.137676 39.935276, 116.260993 39.935276), LINESTRING (116.260993 
39.935276, 116.560993 39.935276, 116.560993 40.163503)', 65)
+```
+
+Query with Polygon Range List UDF predicate
+
+```
+select * from source_index where IN_POLYGON_RANGE_LIST('RANGELIST 
(855279368848 855279368850, 855280799610 855280799612, 855282156300 
855282157400), RANGELIST (855279368852 855279368854, 855280799613 855280799615, 
855282156200 855282157500)', 'OR')
+```
+
+Convert spatial index to spatial grid x, y
+
+```
+select GeoIdToGridXy(mygeohash) as GridXY from source_index
+```
+
+Convert longitude and latitude pair to spatial index   
+The UDF needs two other parameters, oriLatitude and gridSize
+
+```
+select LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId from 
source_index
+```
+
+Convert spatial index to longitude and latitude pair   
+The UDF needs two other parameters, oriLatitude and gridSize
+
+```
+select GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude from 
source_index
+```
+
+Convert spatial index to upper layer spatial index of pyramid model
+
+```
+select ToUpperLayerGeoId(mygeohash) as upperLayerGeoId from source_index
+```
+
+Convert string polygon to internal spatial index range list
+
+```
+select ToRangeList('116.321011 40.123503, 116.320311 40.122503, 116.321111 
40.121503, 116.321011 40.123503', 39.832277, 50) as rangeList
+```
+
 ## Reference
 
 ```
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
index d9641cf..2b1b6b7 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
@@ -26,4 +26,34 @@ public class GeoConstants {
 
   // GeoHash type Spatial Index
   public static final String GEOHASH = "geohash";
+
+  // Regular expression to parse input polygons for IN_POLYGON_LIST
+  public static final String POLYGON_REG_EXPRESSION = "(?<=POLYGON 
\\(\\()(.*?)(?=(\\)\\)))";
+
+  // Regular expression to parse input polylines for IN_POLYLINE_LIST
+  public static final String POLYLINE_REG_EXPRESSION = "LINESTRING \\(.*?\\)";
+
+  // Regular expression to parse input rangelists for IN_POLYGON_RANGE_LIST
+  public static final String RANGELIST_REG_EXPRESSION = "(?<=RANGELIST 
\\()(.*?)(?=\\))";
+
+  // delimiter of input points or ranges
+  public static final String DEFAULT_DELIMITER = ",";
+
+  // conversion factor of angle to radian
+  public static final double CONVERT_FACTOR = 180.0;
+
+  // Earth radius
+  public static final double EARTH_RADIUS = 6371004.0;
+
+  // used in Geo Hash calculation formula for improving calculation accuracy
+  public static final int CONVERSION_RATIO = 100000000;
+
+  // used for multiplying input longitude and latitude which are processed by 
* 10E6
+  public static final int CONVERSION_FACTOR_FOR_ACCURACY = 100;
+
+  // used in transforming UDF geoID2LngLat, set scale of BigDecimal
+  public static final int SCALE_OF_LONGITUDE_AND_LATITUDE = 6;
+
+  // Length in meters of 1 degree of latitude
+  public static final double CONVERSION_FACTOR_OF_METER_TO_DEGREE = 111320;
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
index b361e03..4a5e892 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.geo;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -44,24 +43,16 @@ public class GeoHashIndex extends CustomIndex<List<Long[]>> 
{
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(GeoHashIndex.class.getName());
 
-  // conversion factor of angle to radian
-  private static final double CONVERT_FACTOR = 180.0;
-  // Earth radius
-  private static final double EARTH_RADIUS = 6371004.0;
   // Latitude of coordinate origin
   private double oriLatitude;
-  // User defined maximum longitude of map
-  private double userDefineMaxLongitude;
-  // User defined maximum latitude of map
-  private double userDefineMaxLatitude;
-  // User defined map minimum longitude
-  private double userDefineMinLongitude;
-  // User defined map minimum latitude
-  private double userDefineMinLatitude;
+  // The minimum longitude of the completed map after calculation
+  private double calculateMinLongitude;
+  // The minimum latitude of the completed map after calculation
+  private double calculateMinLatitude;
   // The maximum longitude of the completed map after calculation
-  private double CalculateMaxLongitude;
+  private double calculateMaxLongitude;
   // The maximum latitude of the completed map after calculation
-  private double CalculateMaxLatitude;
+  private double calculateMaxLatitude;
   // Grid length is in meters
   private int gridSize;
   // cos value of latitude of origin of coordinate
@@ -70,19 +61,11 @@ public class GeoHashIndex extends CustomIndex<List<Long[]>> 
{
   private double deltaY;
   // Each grid size length should be the degree of X axis
   private double deltaX;
-  // Degree * coefficient of Y axis corresponding to each grid size length
-  private double deltaYByRatio;
-  // Each grid size length should be X-axis Degree * coefficient
-  private double deltaXByRatio;
   // The number of knives cut for the whole area (one horizontally and one 
vertically)
   // is the depth of quad tree
   private int cutLevel;
   // used to convert the latitude and longitude of double type to int type for 
calculation
   private int conversionRatio;
-  // * Constant of coefficient
-  private double lon0ByRation;
-  // * Constant of coefficient
-  private double lat0ByRation;
 
 
   /**
@@ -92,10 +75,6 @@ public class GeoHashIndex extends CustomIndex<List<Long[]>> {
    * 'SPATIAL_INDEX.mygeohash.type'='geohash',
    * 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
    * 'SPATIAL_INDEX.mygeohash.gridSize'=''
-   * 'SPATIAL_INDEX.mygeohash.minLongitude'=''
-   * 'SPATIAL_INDEX.mygeohash.maxLongitude'=''
-   * 'SPATIAL_INDEX.mygeohash.minLatitude'=''
-   * 'SPATIAL_INDEX.mygeohash.maxLatitude'=''
    * 'SPATIAL_INDEX.mygeohash.orilatitude''')
    * @param indexName index name. Implicitly a column is created with index 
name.
    * @param properties input properties,please check the describe
@@ -155,34 +134,6 @@ public class GeoHashIndex extends 
CustomIndex<List<Long[]>> {
               String.format("%s property is invalid. Must specify %s 
property.",
                       CarbonCommonConstants.SPATIAL_INDEX, ORIGIN_LATITUDE));
     }
-    String MIN_LONGITUDE = commonKey + "minlongitude";
-    String MAX_LONGITUDE = commonKey + "maxlongitude";
-    String MIN_LATITUDE = commonKey + "minlatitude";
-    String MAX_LATITUDE = commonKey + "maxlatitude";
-    String minLongitude = properties.get(MIN_LONGITUDE);
-    String maxLongitude = properties.get(MAX_LONGITUDE);
-    String minLatitude = properties.get(MIN_LATITUDE);
-    String maxLatitude = properties.get(MAX_LATITUDE);
-    if (StringUtils.isEmpty(minLongitude)) {
-      throw new MalformedCarbonCommandException(
-          String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.SPATIAL_INDEX, MIN_LONGITUDE));
-    }
-    if (StringUtils.isEmpty(minLatitude)) {
-      throw new MalformedCarbonCommandException(
-          String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.SPATIAL_INDEX, MIN_LATITUDE));
-    }
-    if (StringUtils.isEmpty(maxLongitude)) {
-      throw new MalformedCarbonCommandException(
-          String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.SPATIAL_INDEX, MAX_LONGITUDE));
-    }
-    if (StringUtils.isEmpty(maxLatitude)) {
-      throw new MalformedCarbonCommandException(
-          String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.SPATIAL_INDEX, MAX_LATITUDE));
-    }
     String GRID_SIZE = commonKey + "gridsize";
     String gridSize = properties.get(GRID_SIZE);
     if (StringUtils.isEmpty(gridSize)) {
@@ -200,13 +151,9 @@ public class GeoHashIndex extends 
CustomIndex<List<Long[]>> {
 
     // Fill the values to the instance fields
     this.oriLatitude = Double.valueOf(originLatitude);
-    this.userDefineMaxLongitude = Double.valueOf(maxLongitude);
-    this.userDefineMaxLatitude = Double.valueOf(maxLatitude);
-    this.userDefineMinLongitude = Double.valueOf(minLongitude);
-    this.userDefineMinLatitude = Double.valueOf(minLatitude);
     this.gridSize = Integer.parseInt(gridSize);
     this.conversionRatio = Integer.parseInt(conversionRatio);
-    calculateData();
+    calculateInitialArea();
   }
 
   /**
@@ -230,8 +177,9 @@ public class GeoHashIndex extends CustomIndex<List<Long[]>> 
{
     Long longitude = (Long) sources.get(0);
     Long latitude  = (Long) sources.get(1);
     // generate the hash code
-    int[] gridPoint = calculateID(longitude, latitude);
-    Long hashId = createHashID(gridPoint[0], gridPoint[1]);
+    long longtitudeByRatio = longitude * (GeoConstants.CONVERSION_RATIO / 
this.conversionRatio);
+    long latitudeByRatio = latitude * (GeoConstants.CONVERSION_RATIO / 
this.conversionRatio);
+    Long hashId = lonLat2GeoID(longtitudeByRatio, latitudeByRatio, 
this.oriLatitude, this.gridSize);
     return String.valueOf(hashId);
   }
 
@@ -245,48 +193,20 @@ public class GeoHashIndex extends 
CustomIndex<List<Long[]>> {
    */
   @Override
   public List<Long[]> query(String polygon) throws Exception {
-    if (!validate(polygon)) {
-      return null;
-    } else {
-      String[] pointList = polygon.trim().split(",");
-      List<double[]> queryList = new ArrayList<>();
-      for (String str: pointList) {
-        String[] points = splitString(str);
-        if (2 != points.length) {
-          throw new RuntimeException("longitude and latitude is a pair need 2 
data");
-        } else {
-          try {
-            queryList.add(new double[] {Double.valueOf(points[0]), 
Double.valueOf(points[1])});
-          } catch (NumberFormatException e) {
-            throw new RuntimeException("can not covert the string data to 
double", e);
-          }
-        }
-      }
-      if (!checkPointsSame(pointList[0], pointList[pointList.length - 1])) {
-        throw new RuntimeException("the first point and last point in polygon 
should be same");
-      } else {
-        List<Long[]> rangeList = getPolygonRangeList(queryList);
-        return rangeList;
-      }
-    }
-  }
-
-  private String[] splitString(String str) {
-    return str.trim().split("\\s+");
+    List<double[]> queryList = GeoHashUtils.getPointListFromPolygon(polygon);
+    return getPolygonRangeList(queryList);
   }
 
-  private boolean checkPointsSame(String point1, String point2) throws 
Exception {
-    String[] points1 = splitString(point1);
-    String[] points2 = splitString(point2);
-    return points1[0].equals(points2[0]) && points1[1].equals(points2[1]);
-  }
-
-  private boolean validate(String polygon) throws Exception {
-    String[] pointList = polygon.trim().split(",");
-    if (4 > pointList.length) {
-      throw new RuntimeException("polygon need at least 3 points, really has " 
+ pointList.length);
-    }
-    return true;
+  /**
+   * Query processor for GeoHash.
+   * example: [[35, 10], [45, 45], [15, 40], [10, 20], [35, 10]]
+   * @param queryPointList point list of a polygon, close out to form an area
+   * @return Returns list of ranges of GeoHash IDs
+   * @throws Exception
+   */
+  @Override
+  public List<Long[]> query(List<double[]> queryPointList) throws Exception {
+    return getPolygonRangeList(queryPointList);
   }
 
   /**
@@ -296,108 +216,48 @@ public class GeoHashIndex extends 
CustomIndex<List<Long[]>> {
    * @throws Exception
    */
   private  List<Long[]> getPolygonRangeList(List<double[]> queryList) throws 
Exception {
-    QuadTreeCls qTreee = new QuadTreeCls(userDefineMinLongitude, 
userDefineMinLatitude,
-        CalculateMaxLongitude, CalculateMaxLatitude, cutLevel);
+    QuadTreeCls qTreee = new QuadTreeCls(calculateMinLongitude, 
calculateMinLatitude,
+        calculateMaxLongitude, calculateMaxLatitude, cutLevel, 
(int)Math.log10(conversionRatio));
     qTreee.insert(queryList);
     return qTreee.getNodesData();
   }
 
   /**
-   *  After necessary attributes, perform necessary calculation
-   * @throws Exception
-   */
-  private void calculateData() throws Exception {
-    // Angular to radian, radians = (Math.PI / 180) * degrees
-    // Cosine value of latitude angle of origin
-    this.mCos = Math.cos(this.oriLatitude / this.conversionRatio * Math.PI / 
CONVERT_FACTOR);
-    // get δx=L∗360/(2πR∗cos(lat))
-    this.deltaX = (this.gridSize * 360) / (2 * Math.PI * EARTH_RADIUS * 
this.mCos);
-    this.deltaXByRatio = this.deltaX * this.conversionRatio;
-    // get δy=L∗360/2πR
-    this.deltaY = (this.gridSize * 360) / (2 * Math.PI * EARTH_RADIUS);
-    this.deltaYByRatio = this.deltaY * this.conversionRatio;
-    LOGGER.info("after spatial calculate delta X is: " + String.format("%f", 
this.deltaX)
-                    + "the delta Y is: " + String.format("%f", this.deltaY));
-    LOGGER.info("after spatial calculate X ByRatio is: " + String.format("%f", 
this.deltaXByRatio)
-                    + "the Y ByRatio is: " + String.format("%f", 
this.deltaYByRatio));
-    // Calculate the complement area and grid i,j for grid number
-    // Xmax = x0+(2^n∗δx) Ymax = y0+(2^n∗δx) Where n is the number of cut
-    // Where x0, Y0 are the minimum x, y coordinates of a given region,
-    // Xmax >= maxLongitude Ymax >= maxLatitude
-    // In the calculation process, first substitute maxlongitude and 
maxlatitude to calculate n.
-    // if n is not an integer, then take the next integer of N, and then 
substitute to find
-    // xmax and ymax。
-    this.calculateArea();
-  }
-
-  /**
-   * Calculate the complement area, including the range of the complement 
area, t
-   * he number of knives cut and the depth of the quad tree
+   * Calculate the initial partition area and some variables, that are used to 
generate GeoId,
+   * including the minLatitude, maxLatitude, minLongitude and maxLongitude of 
the partition area,
+   * the number of knives cut, which is also the depth of the quad tree.
    */
-  private void calculateArea() {
-    // step 1 calculate xn, yn by using maxLongitude, maxLatitude, 
minLongitude, minLatitude
-    // substitution formula
-    // Here, the user's given area is mostly rectangle, which needs to be 
extended to
-    // square processing to find the maximum value of XN and yn
-    // n=log_2 (Xmax−X0)/δx, log_2 (Ymax−Y0)/δy
-    userDefineMinLongitude = userDefineMinLongitude - deltaX / 2;
-    userDefineMaxLongitude = userDefineMaxLongitude + deltaX / 2;
-    userDefineMinLatitude = userDefineMinLatitude - deltaY / 2;
-    userDefineMaxLatitude = userDefineMaxLatitude + deltaY / 2;
-
-    this.lon0ByRation = userDefineMinLongitude * this.conversionRatio;
-    this.lat0ByRation = userDefineMinLatitude * this.conversionRatio;
-
-    double Xn = Math.log((userDefineMaxLongitude - userDefineMinLongitude) / 
deltaX)
-                    / Math.log(2);
-    double Yn = Math.log((userDefineMaxLatitude - userDefineMinLatitude) / 
deltaY)
-                    / Math.log(2);
-    double doubleMax = Math.max(Xn, Yn);
-    this.cutLevel = doubleMax % 1 == 0 ? (int) doubleMax : (int) (doubleMax + 
1);
-    // step 2 recalculate the region according to the number of segmentation
-    this.CalculateMaxLongitude = userDefineMinLongitude + Math.pow(2, 
this.cutLevel)
-                                                              * deltaX;
-    this.CalculateMaxLatitude = userDefineMinLatitude + Math.pow(2, 
this.cutLevel)
-                                                            * deltaY;
+  private void calculateInitialArea() {
+    this.cutLevel = GeoHashUtils.getCutCount(gridSize, oriLatitude);
+    this.deltaX = GeoHashUtils.getDeltaX(oriLatitude, gridSize);
+    this.deltaY = GeoHashUtils.getDeltaY(gridSize);
+    double maxLatitudeOfInitialArea = this.deltaY * Math.pow(2, this.cutLevel 
- 1);
+    this.mCos = Math.cos(oriLatitude * Math.PI / GeoConstants.CONVERT_FACTOR);
+    double maxLongitudeOfInitialArea = maxLatitudeOfInitialArea / mCos;
+    this.calculateMinLatitude = -maxLatitudeOfInitialArea;
+    this.calculateMaxLatitude = maxLatitudeOfInitialArea;
+    this.calculateMinLongitude = -maxLongitudeOfInitialArea;
+    this.calculateMaxLongitude = maxLongitudeOfInitialArea;
+    LOGGER.info("after spatial calculate delta X is: " + String.format("%f", 
this.deltaX) +
+        "the delta Y is: " + String.format("%f", this.deltaY));
     LOGGER.info("After spatial calculate the cut level is: " + 
String.format("%d", this.cutLevel));
-    LOGGER.info("the min longitude is: " + String.format("%f", 
this.userDefineMinLongitude) +
-                    " the max longitude is: " + String.format("%f", 
this.CalculateMaxLongitude));
-    LOGGER.info("the min latitude is: " + String.format("%f", 
this.userDefineMinLatitude) +
-                    " the max latitude is: " + String.format("%f", 
this.CalculateMaxLatitude));
+    LOGGER.info("the min longitude is: " + String.format("%f", 
this.calculateMinLongitude) +
+        " the max longitude is: " + String.format("%f", 
this.calculateMaxLongitude));
+    LOGGER.info("the min latitude is: " + String.format("%f", 
this.calculateMinLatitude) +
+        " the max latitude is: " + String.format("%f", 
this.calculateMaxLatitude));
   }
 
   /**
-   * Through grid index coordinates and calculation of hashid, grid latitude 
and longitude
-   * coordinates can be transformed by latitude and longitude
-   * @param longitude Longitude, the actual longitude and latitude are 
processed by * coefficient,
-   *                  and the floating-point calculation is converted to 
integer calculation
-   * @param latitude Latitude, the actual longitude and latitude are processed 
by * coefficient,
-   *                 and the floating-point calculation is converted to 
integer calculation.
-   * @return Grid ID value [row, column] column starts from 1
+   * Transform longitude and latitude to geo id
+   *
+   * @param longitude the point longitude
+   * @param latitude the point latitude
+   * @param oriLatitude the origin point latitude
+   * @param gridSize the grid size
+   * @return GeoID
    */
-  private int[] calculateID(long longitude, long latitude) throws Exception {
-    try {
-      int row = (int) ((longitude - this.lon0ByRation) / this.deltaXByRatio);
-      int column = (int) ((latitude - this.lat0ByRation) / this.deltaYByRatio);
-      return new int[]{row, column};
-    } catch (ArithmeticException  e) {
-      throw new RuntimeException("can not divide by zero.");
-    }
-  }
-
-  /**
-   * Calculate the corresponding hashid value from the grid coordinates
-   * @param row Gridded row index
-   * @param column Gridded column index
-   * @return hash id
-   */
-  private long createHashID(long row, long column) {
-    long index = 0L;
-    for (int i = 0; i < cutLevel + 1; i++) {
-      long x = (row >> i) & 1;    // take position I
-      long y = (column >> i) & 1;
-      index = index | (x << (2 * i + 1)) | (y << 2 * i);
-    }
-    return index;
+  private long lonLat2GeoID(long longitude, long latitude, double oriLatitude, 
int gridSize) {
+    int[] ij = GeoHashUtils.lonLat2ColRow(longitude, latitude, oriLatitude, 
gridSize);
+    return GeoHashUtils.colRow2GeoID(ij[0], ij[1]);
   }
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
new file mode 100644
index 0000000..e5e7a78
--- /dev/null
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class GeoHashUtils {
+
+  /**
+   * Get the degree of each grid in the east-west direction.
+   *
+   * @param originLatitude the origin point latitude
+   * @param gridSize the grid size
+   * @return Delta X is the degree of each grid in the east-west direction
+   */
+  public static double getDeltaX(double originLatitude, int gridSize) {
+    double mCos = Math.cos(originLatitude * Math.PI / 
GeoConstants.CONVERT_FACTOR);
+    return (GeoConstants.CONVERT_FACTOR * gridSize) / (Math.PI * 
GeoConstants.EARTH_RADIUS * mCos);
+  }
+
+  /**
+   * Get the degree of each grid in the north-south direction.
+   *
+   * @param gridSize the grid size
+   * @return Delta Y is the degree of each grid in the north-south direction
+   */
+  public static double getDeltaY(int gridSize) {
+    return (GeoConstants.CONVERT_FACTOR * gridSize) / (Math.PI * 
GeoConstants.EARTH_RADIUS);
+  }
+
+  /**
+   * Calculate the number of knives cut
+   *
+   * @param gridSize the grid size
+   * @param originLatitude the origin point latitude
+   * @return The number of knives cut
+   */
+  public static int getCutCount(int gridSize, double originLatitude) {
+    double deltaX = getDeltaX(originLatitude, gridSize);
+    int countX = Double.valueOf(
+        Math.ceil(Math.log(2 * GeoConstants.CONVERT_FACTOR / deltaX) / 
Math.log(2))).intValue();
+    double deltaY = getDeltaY(gridSize);
+    int countY = Double.valueOf(
+        Math.ceil(Math.log(GeoConstants.CONVERT_FACTOR / deltaY) / 
Math.log(2))).intValue();
+    return Math.max(countX, countY);
+  }
+
+  /**
+   * Convert input longitude and latitude to GeoID
+   *
+   * @param longitude Longitude, the actual longitude and latitude are 
processed by * coefficient,
+   *                  and the floating-point calculation is converted to 
integer calculation
+   * @param latitude Latitude, the actual longitude and latitude are processed 
by * coefficient,
+   *                  and the floating-point calculation is converted to 
integer calculation.
+   * @param oriLatitude the origin point latitude
+   * @param gridSize the grid size
+   * @return GeoID
+   */
+  public static long lonLat2GeoID(long longitude, long latitude, double 
oriLatitude, int gridSize) {
+    long longtitudeByRatio = longitude * 
GeoConstants.CONVERSION_FACTOR_FOR_ACCURACY;
+    long latitudeByRatio = latitude * 
GeoConstants.CONVERSION_FACTOR_FOR_ACCURACY;
+    int[] ij = lonLat2ColRow(longtitudeByRatio, latitudeByRatio, oriLatitude, 
gridSize);
+    return colRow2GeoID(ij[0], ij[1]);
+  }
+
+  /**
+   * Calculate geo id through grid index coordinates, the row and column of 
grid coordinates
+   * can be transformed by latitude and longitude
+   *
+   * @param longitude Longitude, the actual longitude and latitude are 
processed by * coefficient,
+   * and the floating-point calculation is converted to integer calculation
+   * @param latitude Latitude, the actual longitude and latitude are processed 
by * coefficient,
+   * and the floating-point calculation is converted to integer calculation
+   * @param oriLatitude the latitude of origin point,which is used to 
calculate the deltaX and cut
+   * level.
+   * @param gridSize the size of minimal grid after cut
+   * @return Grid ID value [row, column], column starts from 1
+   */
+  public static int[] lonLat2ColRow(long longitude, long latitude, double 
oriLatitude,
+      int gridSize) {
+    int cutLevel = getCutCount(gridSize, oriLatitude);
+    int column = (int) Math.floor(longitude / getDeltaX(oriLatitude, gridSize) 
/
+        GeoConstants.CONVERSION_RATIO) + (1 << (cutLevel - 1));
+    int row = (int) Math.floor(latitude / getDeltaY(gridSize) /
+        GeoConstants.CONVERSION_RATIO) + (1 << (cutLevel - 1));
+    return new int[] {row, column};
+  }
+
+  /**
+   * Calculate the corresponding GeoId value from the grid coordinates
+   *
+   * @param row Gridded row index
+   * @param column Gridded column index
+   * @return hash id
+   */
+  public static long colRow2GeoID(int row, int column) {
+    long geoID = 0L;
+    int bit = 0;
+    long sourceRow = (long) row;
+    long sourceColumn = (long)column;
+    while (sourceRow > 0 || sourceColumn > 0) {
+      geoID = geoID | ((sourceRow & 1) << (2 * bit + 1)) | ((sourceColumn & 1) 
<< 2 * bit);
+      sourceRow >>= 1;
+      sourceColumn >>= 1;
+      bit++;
+    }
+    return geoID;
+  }
+
+  /**
+   * Convert input GeoID to longitude and latitude
+   *
+   * @param geoId GeoID
+   * @param oriLatitude the origin point latitude
+   * @param gridSize the grid size
+   * @return Longitude and latitude of grid center point
+   */
+  public static double[] geoID2LatLng(long geoId, double oriLatitude, int 
gridSize) {
+    int[] rowCol = geoID2ColRow(geoId);
+    int column = rowCol[1];
+    int row = rowCol[0];
+    int cutLevel = getCutCount(gridSize, oriLatitude);
+    double deltaX = getDeltaX(oriLatitude, gridSize);
+    double deltaY = getDeltaY(gridSize);
+    double longitude = (column - (1 << (cutLevel - 1)) + 0.5) * deltaX;
+    double latitude = (row - (1 << (cutLevel - 1)) + 0.5) * deltaY;
+    longitude = new 
BigDecimal(longitude).setScale(GeoConstants.SCALE_OF_LONGITUDE_AND_LATITUDE,
+        BigDecimal.ROUND_HALF_UP).doubleValue();
+    latitude = new 
BigDecimal(latitude).setScale(GeoConstants.SCALE_OF_LONGITUDE_AND_LATITUDE,
+        BigDecimal.ROUND_HALF_UP).doubleValue();
+    return new double[]{latitude, longitude};
+  }
+
+  /**
+   * Convert input GeoID to grid column and row
+   *
+   * @param geoId GeoID
+   * @return grid column index and row index
+   */
+  public static int[] geoID2ColRow(long geoId) {
+    int row = 0;
+    int column = 0;
+    int bit = 0;
+    long source = geoId;
+    while (source > 0) {
+      column |= (source & 1) << bit;
+      source >>= 1;
+      row |= (source & 1) << bit;
+      source >>= 1;
+      bit++;
+    }
+    return new int[] {row, column};
+  }
+
+  /**
+   * Convert input string polygon to GeoID range list
+   *
+   * @param polygon input polygon string
+   * @param oriLatitude the origin point latitude
+   * @param gridSize the grid size
+   * @return GeoID range list of the polygon
+   */
+  public static List<Long[]> getRangeList(String polygon, double oriLatitude, 
int gridSize) {
+    List<double[]> queryPointList = getPointListFromPolygon(polygon);
+    int cutLevel = getCutCount(gridSize, oriLatitude);
+    double deltaY = getDeltaY(gridSize);
+    double maxLatitudeOfInitialArea = deltaY * Math.pow(2, cutLevel - 1);
+    double mCos = Math.cos(oriLatitude * Math.PI / 
GeoConstants.CONVERT_FACTOR);
+    double maxLongitudeOfInitialArea = maxLatitudeOfInitialArea / mCos;
+    double minLatitudeOfInitialArea = -maxLatitudeOfInitialArea;
+    double minLongitudeOfInitialArea = -maxLongitudeOfInitialArea;
+    QuadTreeCls qTreee = new QuadTreeCls(minLongitudeOfInitialArea, 
minLatitudeOfInitialArea,
+        maxLongitudeOfInitialArea, maxLatitudeOfInitialArea, cutLevel,
+        GeoConstants.SCALE_OF_LONGITUDE_AND_LATITUDE);
+    qTreee.insert(queryPointList);
+    return qTreee.getNodesData();
+  }
+
+  /**
+   * Convert input GeoID to upper layer GeoID of pyramid
+   *
+   * @param geoId GeoID
+   * @return the upper layer GeoID
+   */
+  public static long convertToUpperLayerGeoId(long geoId) {
+    return geoId >> 2;
+  }
+
+  /**
+   * Parse input polygon string to point list
+   *
+   * @param polygon input polygon string, example: POLYGON (35 10, 45 45, 15 
40, 10 20, 35 10)
+   * @return the point list
+   */
+  public static List<double[]> getPointListFromPolygon(String polygon) {
+    String[] pointStringList = 
polygon.trim().split(GeoConstants.DEFAULT_DELIMITER);
+    if (4 > pointStringList.length) {
+      throw new RuntimeException(
+          "polygon need at least 3 points, really has " + 
pointStringList.length);
+    }
+    List<double[]> queryList = new ArrayList<>();
+    for (String pointString : pointStringList) {
+      String[] point = splitStringToPoint(pointString);
+      if (2 != point.length) {
+        throw new RuntimeException("longitude and latitude is a pair need 2 
data");
+      }
+      try {
+        queryList.add(new double[] {Double.valueOf(point[0]), 
Double.valueOf(point[1])});
+      } catch (NumberFormatException e) {
+        throw new RuntimeException("can not covert the string data to double", 
e);
+      }
+    }
+    if (!checkPointsSame(pointStringList[0], 
pointStringList[pointStringList.length - 1])) {
+      throw new RuntimeException("the first point and last point in polygon 
should be same");
+    } else {
+      return queryList;
+    }
+  }
+
+  private static boolean checkPointsSame(String point1, String point2) {
+    String[] points1 = splitStringToPoint(point1);
+    String[] points2 = splitStringToPoint(point2);
+    return points1[0].equals(points2[0]) && points1[1].equals(points2[1]);
+  }
+
+  public static String[] splitStringToPoint(String str) {
+    return str.trim().split("\\s+");
+  }
+
+  public static void validateRangeList(List<Long[]> ranges) {
+    for (Long[] range : ranges) {
+      if (range.length != 2) {
+        throw new RuntimeException("Query processor must return list of ranges 
with each range "
+            + "containing minimum and maximum values");
+      }
+    }
+  }
+
+  /**
+   * Get two polygon's union and intersection
+   *
+   * @param rangeListA geoId range list of polygonA
+   * @param rangeListB geoId range list of polygonB
+   * @return geoId range list of processed set
+   */
+  public static List<Long[]> processRangeList(List<Long[]> rangeListA, 
List<Long[]> rangeListB,
+      String opType) {
+    List<Long[]> processedRangeList;
+    GeoOperationType operationType = GeoOperationType.getEnum(opType);
+    if (operationType == null) {
+      throw new RuntimeException("Unsupported operation type " + opType);
+    }
+    switch (operationType) {
+      case OR:
+        processedRangeList = getPolygonUnion(rangeListA, rangeListB);
+        break;
+      case AND:
+        processedRangeList = getPolygonIntersection(rangeListA, rangeListB);
+        break;
+      default:
+        throw new RuntimeException("Unsupported operation type " + opType);
+    }
+    return processedRangeList;
+  }
+
+  /**
+   * Get two polygon's union
+   *
+   * @param rangeListA geoId range list of polygonA
+   * @param rangeListB geoId range list of polygonB
+   * @return geoId range list after union
+   */
+  private static List<Long[]> getPolygonUnion(List<Long[]> rangeListA, 
List<Long[]> rangeListB) {
+    if (Objects.isNull(rangeListA)) {
+      return rangeListB;
+    }
+    if (Objects.isNull(rangeListB)) {
+      return rangeListA;
+    }
+    int sizeFirst = rangeListA.size();
+    int sizeSecond = rangeListB.size();
+    if (sizeFirst > sizeSecond) {
+      rangeListA.addAll(sizeFirst, rangeListB);
+      return mergeList(rangeListA);
+    } else {
+      rangeListB.addAll(sizeSecond, rangeListA);
+      return mergeList(rangeListB);
+    }
+  }
+
+  private static List<Long[]> mergeList(List<Long[]> list) {
+    if (list.size() == 0) {
+      return list;
+    }
+    Collections.sort(list, new Comparator<Long[]>() {
+      @Override
+      public int compare(Long[] arr1, Long[] arr2) {
+        return Long.compare(arr1[0], arr2[0]);
+      }
+    });
+    Long[] min;
+    Long[] max;
+    for (int i = 0; i < list.size(); i++) {
+      min = list.get(i);
+      for (int j = i + 1; j < list.size(); j++) {
+        max = list.get(j);
+        if (min[1] + 1 >= max[0]) {
+          min[1] = Math.max(max[1], min[1]);
+          list.remove(j);
+          j--;
+        } else {
+          break;
+        }
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Get two polygon's intersection
+   *
+   * @param rangeListA geoId range list of polygonA
+   * @param rangeListB geoId range list of polygonB
+   * @return geoId range list after intersection
+   */
+  private static List<Long[]> getPolygonIntersection(List<Long[]> rangeListA,
+      List<Long[]> rangeListB) {
+    List<Long[]> intersectionList = new ArrayList<>();
+    if (Objects.isNull(rangeListA) || Objects.isNull(rangeListB)) {
+      return Collections.emptyList();
+    }
+    int endIndex1 = rangeListA.size();
+    int endIndex2 = rangeListB.size();
+    int startIndex1 = 0;
+    int startIndex2 = 0;
+
+    long start = 0;
+    long end = 0;
+    while (startIndex1 < endIndex1 && startIndex2 < endIndex2) {
+      start = Math.max(rangeListA.get(startIndex1)[0], 
rangeListB.get(startIndex2)[0]);
+      if (rangeListA.get(startIndex1)[1] < rangeListB.get(startIndex2)[1]) {
+        end = rangeListA.get(startIndex1)[1];
+        startIndex1++;
+      } else {
+        end = rangeListB.get(startIndex2)[1];
+        startIndex2++;
+      }
+      if (start <= end) {
+        intersectionList.add(new Long[]{start, end});
+      }
+    }
+    return intersectionList;
+  }
+
+  /**
+   * Evaluate whether the search value is in the GeoId ranges.
+   * If the search value is greater than or equal to the minimum of one range, 
i.e.ranges.get(i)[0]
+   * and less than or equal to the maximum of this range, 
i.e.ranges.get(i)[1], return true,
+   * otherwise false.
+   *
+   * Used in method evaluate(RowIntf value) of PolygonExpression, 
PolygonListExpression,
+   * PolylineListExpression, PolygonRangeListExpression.
+   *
+   * @param ranges GeoId ranges of expression, including PolygonExpression, 
PolygonListExpression,
+   *               PolylineListExpression and PolygonRangeListExpression
+   * @param searchForNumber the search value, which is GeoId of each row of 
geo table
+   */
+  public static boolean rangeBinarySearch(List<Long[]> ranges, long 
searchForNumber) {
+    Long[] range;
+    int low = 0, mid, high = ranges.size() - 1;
+    while (low <= high) {
+      mid = low + ((high - low) / 2);
+      range = ranges.get(mid);
+      if (searchForNumber >= range[0]) {
+        if (searchForNumber <= range[1]) {
+          // Return true if the number is between min and max values of the 
range
+          return true;
+        } else {
+          // Number is bigger than this range's min and max. Search on the 
right side of the range
+          low = mid + 1;
+        }
+      } else {
+        // Number is smaller than this range's min and max. Search on the left 
side of the range
+        high = mid - 1;
+      }
+    }
+    return false;
+  }
+}
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoOperationType.java
similarity index 65%
copy from geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
copy to geo/src/main/java/org/apache/carbondata/geo/GeoOperationType.java
index d9641cf..0c5b170 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoOperationType.java
@@ -17,13 +17,27 @@
 
 package org.apache.carbondata.geo;
 
-/**
- * Geo Constants
- */
-public class GeoConstants {
-  private GeoConstants() {
+public enum GeoOperationType {
+  OR("OR"),
+  AND("AND");
+
+  private String type;
+
+  private GeoOperationType(String type) {
+    this.type = type;
   }
 
-  // GeoHash type Spatial Index
-  public static final String GEOHASH = "geohash";
+  public static GeoOperationType getEnum(String type) {
+    for (GeoOperationType typeEnum : GeoOperationType.values()) {
+      if (typeEnum.type.equalsIgnoreCase(type)) {
+        return typeEnum;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return this.type;
+  }
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java 
b/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
index 86dfea6..eab93c7 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.geo;
 
 import java.awt.geom.Point2D;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -81,10 +82,19 @@ class GeometryOperation {
   /**
    * Converting point objects to point objects in Geo
    * @param pointB Point2D Point object
+   * @param isMaxDepth Whether is the last division
+   * @param scale Set scale to point at last division
    * @return JTS Point object
    */
-  public static Point getPointByPoint2D(Point2D.Double pointB) {
-    Coordinate point = new Coordinate(pointB.x, pointB.y);
+  public static Point getPointByPoint2D(Point2D.Double pointB, boolean 
isMaxDepth, int scale) {
+    Coordinate point;
+    if (isMaxDepth) {
+      point = new Coordinate(
+          new BigDecimal(pointB.x).setScale(scale, 
BigDecimal.ROUND_HALF_UP).doubleValue(),
+          new BigDecimal(pointB.y).setScale(scale, 
BigDecimal.ROUND_HALF_UP).doubleValue());
+    } else {
+      point = new Coordinate(pointB.x, pointB.y);
+    }
     return geoFactory.createPoint(point);
   }
 
@@ -104,10 +114,13 @@ class GeometryOperation {
    * A and B do not intersect each other, A is a polygon, B is a point
    * @param polygonA polygon
    * @param pointB point
+   * @param isMaxDepth Whether is the last division
+   * @param scale Set scale to point at last division
    * @return true Point away from polygon,false Points are inseparable from 
polygons
    */
-  public static boolean disjoint(Geometry polygonA, Point2D.Double pointB) {
-    Point pointGeo = getPointByPoint2D(pointB);
+  public static boolean disjoint(Geometry polygonA, Point2D.Double pointB,
+      boolean isMaxDepth, int scale) {
+    Point pointGeo = getPointByPoint2D(pointB, isMaxDepth, scale);
     boolean result = polygonA.disjoint(pointGeo);
     return result;
   }
@@ -129,10 +142,13 @@ class GeometryOperation {
    * contains - A contains B Compare whether polygon a contains B
    * @param polygonA  polygon
    * @param pointB   point
+   * @param isMaxDepth Whether is the last division
+   * @param scale Set scale to point at last division
    * @return true Polygon a contains point B (B in a), false Polygon a does 
not contain point B
    */
-  public static boolean contains(Geometry polygonA, Point2D.Double pointB) {
-    Point pointGeo = getPointByPoint2D(pointB);
+  public static boolean contains(Geometry polygonA, Point2D.Double pointB,
+      boolean isMaxDepth, int scale) {
+    Point pointGeo = getPointByPoint2D(pointB, isMaxDepth, scale);
     boolean result = polygonA.contains(pointGeo);
     return result;
   }
@@ -153,10 +169,13 @@ class GeometryOperation {
    * intersect - A intersects B Represents the intersection of polygon A and 
point B
    * @param polygonA polygon
    * @param pointB point
+   * @param isMaxDepth Whether is the last division
+   * @param scale Set scale to point at last division
    * @return true Polygon a intersects point B,false Polygon a does not 
intersect point B
    */
-  public static boolean intersects(Geometry polygonA, Point2D.Double pointB) {
-    Point pointGeo = getPointByPoint2D(pointB);
+  public static boolean intersects(Geometry polygonA, Point2D.Double pointB,
+      boolean isMaxDepth, int scale) {
+    Point pointGeo = getPointByPoint2D(pointB, isMaxDepth, scale);
     boolean result = polygonA.intersects(pointGeo);
     return result;
   }
@@ -362,13 +381,17 @@ class GridData {
    * and finally the hash value of longitude and latitude data should be 
brought in
    */
   public long createHashID(long row, long column) {
-    long index = 0L;
-    for (int i = 0; i < maxDepth + 1; i++) {
-      long x = (row >> i) & 1;    // take position i
-      long y = (column >> i) & 1;
-      index = index | (x << (2 * i + 1)) | (y << 2 * i);
+    long geoID = 0L;
+    int bit = 0;
+    long sourceRow = row;
+    long sourceColumn = column;
+    while (sourceRow > 0 || sourceColumn > 0) {
+      geoID = geoID | ((sourceRow & 1) << (2 * bit + 1)) | ((sourceColumn & 1) 
<< 2 * bit);
+      sourceRow >>= 1;
+      sourceColumn >>= 1;
+      bit++;
     }
-    return index;
+    return geoID;
   }
 
   /**
@@ -411,6 +434,8 @@ class QuadNode {
   private int currentDepth;
   // Maximum depth
   private int maxDepth;
+  // Scale of point at last division
+  private int scale;
   // Hashid range, 0 min, 1 Max
   private QuadNode northWest = null;
   private QuadNode northEast = null;
@@ -435,12 +460,14 @@ class QuadNode {
    * @param grid         raster data
    * @param currentDepth Current depth
    * @param maxDepth     Maximum depth
+   * @param scale        Scale of point at last division
    */
-  public QuadNode(QuadRect rect, GridData grid, int currentDepth, int 
maxDepth) {
+  public QuadNode(QuadRect rect, GridData grid, int currentDepth, int 
maxDepth, int scale) {
     this.rect = rect;
     this.grid = grid;
     this.currentDepth = currentDepth;
     this.maxDepth = maxDepth;
+    this.scale = scale;
   }
 
   /**
@@ -493,7 +520,7 @@ class QuadNode {
       // Intersecting indicates partial selection, or they may not be selected 
when they are
       // the last node
       Point2D.Double middlePoint = this.rect.getMiddlePoint();
-      if (!GeometryOperation.disjoint(queryPolygon, middlePoint)) {
+      if (!GeometryOperation.disjoint(queryPolygon, middlePoint, true, 
this.scale)) {
         // Select this area and fill in the data range
         this.grid.setStatus(GridData.STATUS_ALL);
       } else {
@@ -527,8 +554,8 @@ class QuadNode {
         if (!GeometryOperation.disjoint(queryRect, topLeft) && 
!GeometryOperation
                                                .disjoint(queryPolygon, 
topLeft)) {
           // If they are not separated, select the upper left half of the mesh
-          GridData grid = new GridData(this.grid.startRow, gridRowMiddle, 
gridColumnMiddle,
-              this.grid.endColumn, this.maxDepth);
+          GridData grid = new GridData(gridRowMiddle, this.grid.endRow, 
this.grid.startColumn,
+              gridColumnMiddle, this.maxDepth);
           insertIntoChildren(ChildEnum.TOPLEFT, grid, topLeft, queryPolygon);
         }
         if (!GeometryOperation.disjoint(queryRect, topRight) && 
!GeometryOperation
@@ -548,8 +575,8 @@ class QuadNode {
         if (!GeometryOperation.disjoint(queryRect, bottomRight) && 
!GeometryOperation
                                                   .disjoint(queryPolygon, 
bottomRight)) {
           // If not, select the lower right half of the mesh
-          GridData grid = new GridData(gridRowMiddle, this.grid.endRow, 
this.grid.startColumn,
-              gridColumnMiddle, this.maxDepth);
+          GridData grid = new GridData(this.grid.startRow, gridRowMiddle, 
gridColumnMiddle,
+              this.grid.endColumn, this.maxDepth);
           insertIntoChildren(ChildEnum.BOTTOMRIGHT, grid, bottomRight, 
queryPolygon);
         }
         // When processing four children, it is necessary to judge whether all 
four children
@@ -574,19 +601,19 @@ class QuadNode {
     QuadRect rect = new QuadRect(rectangle.get(0), rectangle.get(2));
     switch (childType) {
       case TOPLEFT:
-        this.northWest = new QuadNode(rect, grid, currentDepth + 1, maxDepth);
+        this.northWest = new QuadNode(rect, grid, currentDepth + 1, maxDepth, 
scale);
         this.northWest.insert(queryPolygon);
         break;
       case TOPRIGHT:
-        this.northEast = new QuadNode(rect, grid, currentDepth + 1, maxDepth);
+        this.northEast = new QuadNode(rect, grid, currentDepth + 1, maxDepth, 
scale);
         this.northEast.insert(queryPolygon);
         break;
       case BOTTOMLEFT:
-        this.southWest = new QuadNode(rect, grid, currentDepth + 1, maxDepth);
+        this.southWest = new QuadNode(rect, grid, currentDepth + 1, maxDepth, 
scale);
         this.southWest.insert(queryPolygon);
         break;
       case BOTTOMRIGHT:
-        this.southEast = new QuadNode(rect, grid, currentDepth + 1, maxDepth);
+        this.southEast = new QuadNode(rect, grid, currentDepth + 1, maxDepth, 
scale);
         this.southEast.insert(queryPolygon);
         break;
       default:
@@ -750,14 +777,15 @@ public class QuadTreeCls {
    * @param down Lower left point of coordinate
    * @param width Width of area
    * @param height Height of area
+   * @param scale Scale of point at last division
    */
-  public QuadTreeCls(double left, double down, double width, double height, 
int depth) {
+  public QuadTreeCls(double left, double down, double width, double height, 
int depth, int scale) {
     QuadRect rect = new QuadRect(left, down,  width,  height);
     // Maximum column length based on depth
     int maxColumn = (int) Math.pow(2, depth);
     // write row,column data to grid use it to build hash id
     GridData grid = new GridData(0, maxColumn, 0, maxColumn, depth);
-    root = new QuadNode(rect, grid, 1, depth);
+    root = new QuadNode(rect, grid, 1, depth, scale);
     LOGGER.info("build quad tree successfully, the max column is " + 
maxColumn);
   }
 
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index 1adf029..8f4e410 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.GeoHashUtils;
 import 
org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
 
 /**
@@ -47,9 +48,9 @@ import 
org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
  */
 @InterfaceAudience.Internal
 public class PolygonExpression extends UnknownExpression implements 
ConditionalExpression {
-  private String polygon;
-  private CustomIndex<List<Long[]>> instance;
-  private List<Long[]> ranges = new ArrayList<Long[]>();
+  public String polygon;
+  public CustomIndex<List<Long[]>> instance;
+  public List<Long[]> ranges = new ArrayList<Long[]>();
   private ColumnExpression column;
   private static final ExpressionResult trueExpRes =
       new ExpressionResult(DataTypes.BOOLEAN, true);
@@ -62,23 +63,13 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
     this.column = new ColumnExpression(columnName, DataTypes.LONG);
   }
 
-  private void validate(List<Long[]> ranges) {
-    // Validate the ranges
-    for (Long[] range : ranges) {
-      if (range.length != 2) {
-        throw new RuntimeException("Query processor must return list of ranges 
with each range "
-            + "containing minimum and maximum values");
-      }
-    }
-  }
-
   /**
    * This method calls the query processor and gets the list of ranges of IDs.
    */
-  private void processExpression() {
+  public void processExpression() {
     try {
       ranges = instance.query(polygon);
-      validate(ranges);
+      GeoHashUtils.validateRangeList(ranges);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -88,31 +79,9 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
     return ranges;
   }
 
-  private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber) 
{
-    Long[] range;
-    int low = 0, mid, high = ranges.size() - 1;
-    while (low <= high) {
-      mid = low + ((high - low) / 2);
-      range = ranges.get(mid);
-      if (searchForNumber >= range[0]) {
-        if (searchForNumber <= range[1]) {
-          // Return true if the number is between min and max values of the 
range
-          return true;
-        } else {
-          // Number is bigger than this range's min and max. Search on the 
right side of the range
-          low = mid + 1;
-        }
-      } else {
-        // Number is smaller than this range's min and max. Search on the left 
side of the range
-        high = mid - 1;
-      }
-    }
-    return false;
-  }
-
   @Override
   public ExpressionResult evaluate(RowIntf value) {
-    if (rangeBinarySearch(ranges, (Long) value.getVal(0))) {
+    if (GeoHashUtils.rangeBinarySearch(ranges, (Long) value.getVal(0))) {
       return trueExpRes;
     }
     return falseExpRes;
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
new file mode 100644
index 0000000..903b694
--- /dev/null
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.expression;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.GeoConstants;
+import org.apache.carbondata.geo.GeoHashUtils;
+
+/**
+ * InPolygonList expression processor. It inputs the InPolygonList string to 
the Geo
+ * implementation's query method, gets a list of range of IDs from each 
polygon and
+ * calculates the and/or/diff range list to filter as an output. And then, 
build
+ * InExpression with list of all the IDs present in those list of ranges.
+ */
[email protected]
+public class PolygonListExpression extends PolygonExpression {
+
+  private String opType;
+
+  public PolygonListExpression(String polygonListString, String opType, String 
columnName,
+                               CustomIndex indexInstance) {
+    super(polygonListString, columnName, indexInstance);
+    this.opType = opType;
+  }
+
+  @Override
+  public void processExpression() {
+    try {
+      // 1. parse the polygon list string
+      List<String> polygons = new ArrayList<>();
+      Pattern pattern = Pattern.compile(GeoConstants.POLYGON_REG_EXPRESSION);
+      Matcher matcher = pattern.matcher(polygon);
+      while (matcher.find()) {
+        String matchedStr = matcher.group();
+        polygons.add(matchedStr);
+      }
+      if (polygons.size() < 2) {
+        throw new RuntimeException("polygon list need at least 2 polygons, 
really has " +
+            polygons.size());
+      }
+      // 2. get the range list of each polygon
+      List<Long[]> processedRangeList = instance.query(polygons.get(0));
+      GeoHashUtils.validateRangeList(processedRangeList);
+      for (int i = 1; i < polygons.size(); i++) {
+        List<Long[]> tempRangeList = instance.query(polygons.get(i));
+        GeoHashUtils.validateRangeList(tempRangeList);
+        processedRangeList = GeoHashUtils.processRangeList(
+            processedRangeList, tempRangeList, opType);
+      }
+      ranges = processedRangeList;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getStatement() {
+    return "IN_POLYGON_LIST('" + polygon + "', '" + opType + "')";
+  }
+}
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
new file mode 100644
index 0000000..81efe05
--- /dev/null
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.expression;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.GeoConstants;
+import org.apache.carbondata.geo.GeoHashUtils;
+
+/**
+ * InPolygonRangeList expression processor. It inputs the InPolygonRangeList 
string to
+ * the Geo implementation's query method, inputs lists of range of IDs and is 
to be calculated
+ * the and/or/diff range list to filter. And then, build InExpression with 
list of all the IDs
+ * present in those list of ranges.
+ */
[email protected]
+public class PolygonRangeListExpression extends PolygonExpression {
+
+  private String opType;
+
+  public PolygonRangeListExpression(String polygonRangeList, String opType, 
String columnName,
+      CustomIndex indexInstance) {
+    super(polygonRangeList, columnName, indexInstance);
+    this.opType = opType;
+  }
+
+  @Override
+  public void processExpression() {
+    // 1. parse the range list string
+    List<String> rangeLists = new ArrayList<>();
+    Pattern pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION);
+    Matcher matcher = pattern.matcher(polygon);
+    while (matcher.find()) {
+      String matchedStr = matcher.group();
+      rangeLists.add(matchedStr);
+    }
+    // 2. process the range lists
+    if (rangeLists.size() > 0) {
+      List<Long[]> processedRangeList = 
getRangeListFromString(rangeLists.get(0));
+      for (int i = 1; i < rangeLists.size(); i++) {
+        List<Long[]> tempRangeList = getRangeListFromString(rangeLists.get(i));
+        processedRangeList = GeoHashUtils.processRangeList(
+            processedRangeList, tempRangeList, opType);
+      }
+      ranges = processedRangeList;
+    }
+  }
+
+  private void sortRange(List<Long[]> rangeList) {
+    rangeList.sort(new Comparator<Long[]>() {
+      @Override
+      public int compare(Long[] x, Long[] y) {
+        return Long.compare(x[0], y[0]);
+      }
+    });
+  }
+
+  private void combineRange(List<Long[]> rangeList) {
+    for (int i = 0, j = i + 1; i < rangeList.size() - 1; i++, j++) {
+      long previousEnd = rangeList.get(i)[1];
+      long nextStart = rangeList.get(j)[0];
+      long nextEnd = rangeList.get(j)[1];
+      if (previousEnd + 1 >= nextStart) {
+        rangeList.get(j)[0] = rangeList.get(i)[0];
+        rangeList.get(j)[1] = previousEnd >= nextEnd ? previousEnd : nextEnd;
+        rangeList.get(i)[0] = null;
+        rangeList.get(i)[1] = null;
+      }
+    }
+    rangeList.removeIf(item -> item[0] == null && item[1] == null);
+  }
+
+  private List<Long[]> getRangeListFromString(String rangeListString) {
+    String[] rangeStringList = 
rangeListString.trim().split(GeoConstants.DEFAULT_DELIMITER);
+    List<Long[]> rangeList = new ArrayList<>();
+    for (String rangeString : rangeStringList) {
+      String[] range = GeoHashUtils.splitStringToPoint(rangeString);
+      if (range.length != 2) {
+        throw new RuntimeException("each range is a pair need 2 data");
+      }
+      Long[] rangeMinMax;
+      try {
+        rangeMinMax = new Long[]{Long.valueOf(range[0]), 
Long.valueOf(range[1])};
+      } catch (NumberFormatException e) {
+        throw new RuntimeException("can not covert the range from String to 
Long", e);
+      }
+      if (rangeMinMax[0] > rangeMinMax[1]) {
+        throw new RuntimeException(
+            "first value need to be smaller than second value of each range");
+      } else {
+        rangeList.add(rangeMinMax);
+      }
+    }
+    sortRange(rangeList);
+    combineRange(rangeList);
+    return rangeList;
+  }
+
+  @Override
+  public String getStatement() {
+    return "IN_POLYGON_RANGE_LIST('" + polygon + "', '" + opType + "')";
+  }
+}
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
new file mode 100644
index 0000000..70ea7f3
--- /dev/null
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.expression;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.GeoConstants;
+import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
+
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Polygon;
+import org.locationtech.jts.io.WKTReader;
+import org.locationtech.jts.operation.buffer.BufferParameters;
+
+/**
+ * InPolylineList expression processor. It inputs the InPolylineList string to 
the Geo
+ * implementation's query method, gets a list of range of IDs from each 
polygon and
+ * calculates the and/or/diff range list to filter as an output. And then, 
build
+ * InExpression with list of all the IDs present in those list of ranges.
+ */
[email protected]
+public class PolylineListExpression extends PolygonExpression {
+
+  private Float bufferInMeter;
+
+  public PolylineListExpression(String polylineString, Float bufferInMeter, 
String columnName,
+      CustomIndex indexInstance) {
+    super(polylineString, columnName, indexInstance);
+    this.bufferInMeter = bufferInMeter;
+  }
+
+  @Override
+  public void processExpression() {
+    try {
+      // transform the distance unit meter to degree
+      double buffer = bufferInMeter / 
GeoConstants.CONVERSION_FACTOR_OF_METER_TO_DEGREE;
+
+      // 1. parse the polyline list string and get polygon from each polyline
+      List<Geometry> polygonList = new ArrayList<>();
+      WKTReader wktReader = new WKTReader();
+      Pattern pattern = Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION);
+      Matcher matcher = pattern.matcher(polygon);
+      while (matcher.find()) {
+        String matchedStr = matcher.group();
+        LineString polylineCreatedFromStr = (LineString) 
wktReader.read(matchedStr);
+        Polygon polygonFromPolylineBuffer = (Polygon) 
polylineCreatedFromStr.buffer(
+            buffer, 0, BufferParameters.CAP_SQUARE);
+        polygonList.add(polygonFromPolylineBuffer);
+      }
+      // 2. get the range list of each polygon
+      if (polygonList.size() > 0) {
+        List<double[]> pointList = 
getPointListFromGeometry(polygonList.get(0));
+        List<Long[]> processedRangeList = instance.query(pointList);
+        GeoHashUtils.validateRangeList(processedRangeList);
+        for (int i = 1; i < polygonList.size(); i++) {
+          List<double[]> tempPointList = 
getPointListFromGeometry(polygonList.get(i));
+          List<Long[]> tempRangeList = instance.query(tempPointList);
+          GeoHashUtils.validateRangeList(tempRangeList);
+          processedRangeList = GeoHashUtils.processRangeList(
+            processedRangeList, tempRangeList, GeoOperationType.OR.toString());
+        }
+        ranges = processedRangeList;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private List<double[]> getPointListFromGeometry(Geometry geometry) {
+    List<double[]> pointList = new ArrayList<>();
+    Coordinate[] coords = geometry.getCoordinates();
+    for (Coordinate coord : coords) {
+      pointList.add(new double[] {coord.x, coord.y});
+    }
+    return pointList;
+  }
+
+  @Override
+  public String getStatement() {
+    return "IN_POLYLINE_LIST('" + polygon + "', '" + bufferInMeter + "')";
+  }
+}
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
index c5a7317..ffb11d7 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -75,6 +75,10 @@ public class PolygonFilterExecutorImpl extends 
RowLevelFilterExecutorImpl {
   private boolean isScanRequired(byte[] maxValue, byte[] minValue) {
     PolygonExpression polygon = (PolygonExpression) exp;
     List<Long[]> ranges = polygon.getRanges();
+    if (ranges.isEmpty()) {
+      // If the ranges is empty, no need to scan block or blocklet
+      return false;
+    }
     Long min =
         (Long) 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minValue, 
DataTypes.LONG);
     Long max =
@@ -90,8 +94,10 @@ public class PolygonFilterExecutorImpl extends 
RowLevelFilterExecutorImpl {
     }
     // Got same index for both min and max values.
     Long[] oneRange = ranges.subList(startIndex, endIndex + 1).get(0);
-    if ((min >= oneRange[0] && min <= oneRange[1]) || (max >= oneRange[0] && 
max <= oneRange[1])) {
+    if ((min >= oneRange[0] && min <= oneRange[1]) || (max >= oneRange[0] && 
max <= oneRange[1]) ||
+        (oneRange[0] >= min && oneRange[0] <= max) || (oneRange[1] >= min && 
oneRange[1] <= max)) {
       // Either min or max is within the range
+      // either min or max of the range is within the min and max values
       return true;
     }
     // No range between min and max values. Scan can be avoided for this block 
or blocklet
@@ -101,7 +107,6 @@ public class PolygonFilterExecutorImpl extends 
RowLevelFilterExecutorImpl {
   @Override
   public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue,
       boolean[] isMinMaxSet) {
-    assert (exp instanceof PolygonExpression);
     int dimIndex = dimensionChunkIndex[0];
     BitSet bitSet = new BitSet(1);
     if (isMinMaxSet[dimIndex] && isScanRequired(blockMaxValue[dimIndex], 
blockMinValue[dimIndex])) {
diff --git a/geo/src/test/java/org/apache/carbondata/geo/GeoHashUtilsTest.java 
b/geo/src/test/java/org/apache/carbondata/geo/GeoHashUtilsTest.java
new file mode 100644
index 0000000..5e25944
--- /dev/null
+++ b/geo/src/test/java/org/apache/carbondata/geo/GeoHashUtilsTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo;
+
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * GeoHashUtils Tester.
+ */
+public class GeoHashUtilsTest {
+
+  /**
+   * test normal transform from lon,lat to GeoID,with gridSize 50.
+   */
+  @Test
+  public void testLonLat2GeoId() {
+    long geoID = GeoHashUtils.lonLat2GeoID(160000000L, 40000000L, 39.9124, 50);
+    TestCase.assertEquals(902594178014L, geoID);
+  }
+
+  /**
+   * test transform from lon=0,lat=0 to GeoID,with gridSize 50.
+   */
+  @Test
+  public void test00LonLat2GeoId() {
+    long geoID = GeoHashUtils.lonLat2GeoID(0, 0, 0, 50);
+    TestCase.assertEquals((long) Math.pow(2, 39) | (long) Math.pow(2, 38), 
geoID);
+  }
+
+  /**
+   * test transform from minimal lon,lat to GeoID,with gridSize 50.
+   */
+  @Test
+  public void testMinimalLonLat2GeoId() {
+    long geoID = GeoHashUtils.lonLat2GeoID(-235751615L, -235751615L, 0, 50);
+    TestCase.assertEquals(0, geoID);
+  }
+
+  /**
+   * test transform from maximum lon,lat to GeoID,with gridSize 50.
+   */
+  @Test
+  public void testMaxLonLat2GeoId() {
+    long geoID = GeoHashUtils.lonLat2GeoID(235751615L, 235751615L, 0, 50);
+    TestCase.assertEquals((long) Math.pow(2, 40) - 1, geoID);
+  }
+
+  /**
+   * test normal transform from lon,lat to GeoID,with gridSize 5.
+   */
+  @Test
+  public void testLonLat2GeoIdWithGridSize5() {
+    long geoID = GeoHashUtils.lonLat2GeoID(160000000L, 40000000L, 39.9124, 5);
+    TestCase.assertEquals(58152885790335L, geoID);
+  }
+
+  /**
+   * test transform from lon=0,lat=0 to GeoID,with gridSize 5.
+   */
+  @Test
+  public void test00LonLat2GeoIdWithGridSize5() {
+    long geoID = GeoHashUtils.lonLat2GeoID(0, 0, 0, 5);
+    TestCase.assertEquals((long) Math.pow(2, 45) | (long) Math.pow(2, 44), 
geoID);
+  }
+
+  /**
+   * test transform from minimal lon,lat to GeoID,with gridSize 5.
+   */
+  @Test
+  public void testMinimalLonLat2GeoIdWithGridSize5() {
+    long geoID = GeoHashUtils.lonLat2GeoID(-188601292L, -188601292L, 0, 5);
+    TestCase.assertEquals(0, geoID);
+  }
+
+  /**
+   * test transform from maximum lon,lat to GeoID,with gridSize 5.
+   */
+  @Test
+  public void testMaxLonLat2GeoIdWithGridSize5() {
+    long geoID = GeoHashUtils.lonLat2GeoID(188601292L, 188601292L, 0, 5);
+    TestCase.assertEquals((long) (Math.pow(2, 46) - 1), geoID);
+  }
+
+  /**
+   * test normal transform from lon,lat to grid coordinate,with gridSize 50.
+   */
+  @Test
+  public void testLonLat2ColRow() {
+    int[] coordinate = GeoHashUtils.lonLat2ColRow(16000000000L, 4000000000L, 
39.9124, 50);
+    TestCase.assertEquals(613243, coordinate[0]);
+    TestCase.assertEquals(797214, coordinate[1]);
+  }
+
+  /**
+   * test transform from lon=0,lat=0 to grid coordinate,with gridSize 50.
+   */
+  @Test
+  public void test00LonLat2ColRow() {
+    int[] coordinate = GeoHashUtils.lonLat2ColRow(0, 0, 0, 50);
+    TestCase.assertEquals(1 << 19, coordinate[0]);
+    TestCase.assertEquals(1 << 19, coordinate[1]);
+  }
+
+  /**
+   * test transform from minimal lon,lat to grid coordinate,with gridSize 50.
+   */
+  @Test
+  public void testMinimalLonLat2ColRow() {
+    int[] coordinate = GeoHashUtils.lonLat2ColRow(-23575161504L, 
-23575161504L, 0, 50);
+    TestCase.assertEquals(0, coordinate[0]);
+    TestCase.assertEquals(0, coordinate[1]);
+  }
+
+  /**
+   * test transform from minimal lon,lat to grid coordinate,with gridSize 5.
+   */
+  @Test
+  public void testMinimalLonLat2ColRowWithGridSize5() {
+    int[] coordinate = GeoHashUtils.lonLat2ColRow(-18860129203L, 
-18860129203L, 0, 5);
+    TestCase.assertEquals(0, coordinate[0]);
+    TestCase.assertEquals(0, coordinate[1]);
+  }
+
+  /**
+   * test transform from maximum lon,lat to grid coordinate,with gridSize 50.
+   */
+  @Test
+  public void testMaxLonLat2ColRow() {
+    int[] coordinate = GeoHashUtils.lonLat2ColRow(23575161504L, 23575161504L, 
0, 50);
+    TestCase.assertEquals((1 << 20) - 1, coordinate[0]);
+    TestCase.assertEquals((1 << 20) - 1, coordinate[1]);
+  }
+
+  /**
+   * test normal transform from GeoID to grid coordinate.
+   */
+  @Test
+  public void testGeoID2ColRow() {
+    int[] coordinate = GeoHashUtils.geoID2ColRow(975929064943L);
+    TestCase.assertEquals(880111, coordinate[0]);
+    TestCase.assertEquals(613243, coordinate[1]);
+  }
+
+  /**
+   * test transform from GeoID(lon=0,lat=0) to grid coordinate.
+   */
+  @Test
+  public void test00GeoID2ColRow() {
+    int[] coordinate = GeoHashUtils.geoID2ColRow((long) Math.pow(2, 39) | 
(long) Math.pow(2, 38));
+    TestCase.assertEquals(1 << 19, coordinate[0]);
+    TestCase.assertEquals(1 << 19, coordinate[1]);
+  }
+
+  /**
+   * test transform from GeoID(0) to grid coordinate.
+   */
+  @Test
+  public void testMinGeoID2ColRow() {
+    int[] coordinate = GeoHashUtils.geoID2ColRow(0);
+    TestCase.assertEquals(0, coordinate[0]);
+    TestCase.assertEquals(0, coordinate[1]);
+  }
+
+  /**
+   * test transform from maximum GeoID to grid coordinate.
+   */
+  @Test
+  public void testMaxGeoID2ColRow() {
+    int[] coordinate = GeoHashUtils.geoID2ColRow((long) Math.pow(2, 40) - 1);
+    TestCase.assertEquals((1 << 20) - 1, coordinate[0]);
+    TestCase.assertEquals((1 << 20) - 1, coordinate[1]);
+  }
+
+  /**
+   * test normal transform from grid coordinate to GeoID.
+   */
+  @Test
+  public void testColRow2GeoID() {
+    long geoID = GeoHashUtils.colRow2GeoID(880111, 613243);
+    TestCase.assertEquals(975929064943L, geoID);
+  }
+
+  /**
+   * test transform from center grid coordinate to GeoID.
+   */
+  @Test
+  public void test00ColRow2GeoID() {
+    long geoID = GeoHashUtils.colRow2GeoID(1 << 19, 1 << 19);
+    TestCase.assertEquals((long) Math.pow(2, 39) | (long) Math.pow(2, 38), 
geoID);
+  }
+
+  /**
+   * test transform from minimal grid coordinate to GeoID.
+   */
+  @Test
+  public void testMinColRow2GeoID() {
+    long geoID = GeoHashUtils.colRow2GeoID(0, 0);
+    TestCase.assertEquals(0, geoID);
+  }
+
+  /**
+   * test transform from maximum grid coordinate to GeoID.
+   */
+  @Test
+  public void testMaxColRow2GeoID() {
+    long geoID = GeoHashUtils.colRow2GeoID((1 << 20) - 1, (1 << 20) - 1);
+    TestCase.assertEquals((long) Math.pow(2, 40) - 1, geoID);
+  }
+
+  /**
+   * test get cut times for gridSize = 50 and origin lat = 0
+   */
+  @Test
+  public void testGetCutCount50() {
+    TestCase.assertEquals(20, GeoHashUtils.getCutCount(50, 0));
+  }
+
+  /**
+   * test get cut times for gridSize = 50 and origin lat = 39.9124
+   */
+  @Test
+  public void testGetCutCount39() {
+    TestCase.assertEquals(20, GeoHashUtils.getCutCount(50, 39.9124));
+  }
+
+  /**
+   * test get cut times for gridSize = 5 and origin lat = 0
+   */
+  @Test
+  public void testGetCutCount5() {
+    TestCase.assertEquals(23, GeoHashUtils.getCutCount(5, 0));
+  }
+
+  /**
+   * test get deltaX for gridSize = 50 and origin = 0
+   */
+  @Test
+  public void testGetDeltaX() {
+    TestCase.assertEquals(4.496605206422907E-4, GeoHashUtils.getDeltaX(0, 50));
+  }
+
+  /**
+   * test get deltaY for gridSize = 50 and origin = 0
+   */
+  @Test
+  public void testGetDeltaY() {
+    TestCase.assertEquals(4.496605206422907E-4, GeoHashUtils.getDeltaY( 50));
+  }
+}
diff --git a/geo/src/test/java/org/apache/carbondata/geo/QuadTreeClsTest.java 
b/geo/src/test/java/org/apache/carbondata/geo/QuadTreeClsTest.java
index 0eecbae..cb79209 100644
--- a/geo/src/test/java/org/apache/carbondata/geo/QuadTreeClsTest.java
+++ b/geo/src/test/java/org/apache/carbondata/geo/QuadTreeClsTest.java
@@ -17,13 +17,16 @@
 
 package org.apache.carbondata.geo;
 
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
 import java.util.List;
 
-
 /**
  * QuadTreeClsTest Tester.
  */
@@ -36,7 +39,7 @@ public class QuadTreeClsTest {
 
     @Before
     public void before() throws Exception {
-        qtreee = new QuadTreeCls(0, 0, 16, 16, 4 );
+        qtreee = new QuadTreeCls(0, 0, 16, 16, 4, 6);
     }
 
     @After
@@ -284,9 +287,9 @@ public class QuadTreeClsTest {
     }
 
     /**
-     * Initial result 120->120  123->123  122->122  97->97  99->99  102->102  
108->111  104->107  192->207  208->208  210->210  216->216  225->225  228->228  
229->229  151->151  157->157  159->159  158->158
-     * Results after sorting 97->97  99->99  102->102  104->107  108->111  
120->120  122->122  123->123  151->151  157->157  158->158  159->159  192->207  
208->208  210->210  216->216  225->225  228->228  229->229
-     * Combined results 97->97  99->99  102->102  104->111  120->120  122->123 
 151->151  157->158  159->159  192->208  210->210  216->216  225->225  228->229
+     * Initial result 180->180  183->183  181->181  146->146  147->147  
153->153  156->159  148->151  192->207  224->224  225->225  228->228  210->210  
216->216  218->218  107->107  110->110  111->111  109->109
+     * Results after sorting 107->107  109->109  110->110  111->111  146->146  
147->147  148->151  153->153  156->159  180->180  181->181  183->183  192->207  
210->210  216->216  218->218  224->224  225->225  228->228
+     * Combined results 107->107  109->111  146->151  153->153  156->159  
180->181  183->183  192->207  210->210  216->216  218->218  224->225  228->228
      * @throws Exception
      */
     @Test
@@ -300,22 +303,22 @@ public class QuadTreeClsTest {
         qtreee.insert(pointList);
         List<Long[]> data = qtreee.getNodesData();
 
-        // 97->97  99->99  102->102  104->111  120->120  122->123  151->151  
157->159  192->208  210->210  216->216  225->225  228->229
-
-        Assume.assumeTrue(checkValidate(data, 0, 97, 97));
-        Assume.assumeTrue(checkValidate(data, 1, 99, 99));
-        Assume.assumeTrue(checkValidate(data, 2, 102, 102));
-        Assume.assumeTrue(checkValidate(data, 3, 104, 111));
-        Assume.assumeTrue(checkValidate(data, 4, 120, 120));
-        Assume.assumeTrue(checkValidate(data, 5, 122, 123));
-
-        Assume.assumeTrue(checkValidate(data, 6, 151, 151));
-        Assume.assumeTrue(checkValidate(data, 7, 157, 159));
-        Assume.assumeTrue(checkValidate(data, 8, 192, 208));
-        Assume.assumeTrue(checkValidate(data, 9, 210, 210));
-        Assume.assumeTrue(checkValidate(data, 10, 216, 216));
-        Assume.assumeTrue(checkValidate(data, 11, 225, 225));
-        Assume.assumeTrue(checkValidate(data, 12, 228, 229));
+        // 107->107  109->111  146->151  153->153  156->159  180->181  
183->183  192->207  210->210  216->216  218->218  224->225  228->228
+
+        Assume.assumeTrue(checkValidate(data, 0, 107, 107));
+        Assume.assumeTrue(checkValidate(data, 1, 109, 111));
+        Assume.assumeTrue(checkValidate(data, 2, 146, 151));
+        Assume.assumeTrue(checkValidate(data, 3, 153, 153));
+        Assume.assumeTrue(checkValidate(data, 4, 156, 159));
+        Assume.assumeTrue(checkValidate(data, 5, 180, 181));
+
+        Assume.assumeTrue(checkValidate(data, 6, 183, 183));
+        Assume.assumeTrue(checkValidate(data, 7, 192, 207));
+        Assume.assumeTrue(checkValidate(data, 8, 210, 210));
+        Assume.assumeTrue(checkValidate(data, 9, 216, 216));
+        Assume.assumeTrue(checkValidate(data, 10, 218, 218));
+        Assume.assumeTrue(checkValidate(data, 11, 224, 225));
+        Assume.assumeTrue(checkValidate(data, 12, 228, 228));
     }
 
     private boolean checkValidate(List<Long[]> data, int index, int start, int 
end) {
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUdfRegister.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUdfRegister.scala
new file mode 100644
index 0000000..fb38138
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUdfRegister.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+object GeoUdfRegister {
+
+  val LOGGER = 
LogServiceFactory.getLogService(GeoUdfRegister.getClass.getCanonicalName)
+
+  def registerUtilUdf(sparkSession: SparkSession): Unit = {
+    try {
+      LOGGER.info("starting the registration of geo util udf")
+      GeoUtilUDFs.registerUDFs(sparkSession)
+      LOGGER.info("finished the registration of geo util udf")
+    } catch {
+      case e: Throwable =>
+        LOGGER.error(e)
+    }
+  }
+
+  def registerQueryFilterUdf(sparkSession: SparkSession): Unit = {
+    try {
+      LOGGER.info("starting the registration of geo query filter udf")
+      GeoFilterUDFs.registerUDFs(sparkSession)
+      LOGGER.info("finished the registration of geo query filter udf")
+    } catch {
+      case e: Throwable =>
+        LOGGER.error(e)
+    }
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
new file mode 100644
index 0000000..a2e7a42
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+
+object GeoUtilUDFs {
+
+  def registerUDFs(sparkSession: SparkSession): Unit = {
+    sparkSession.udf.register("GeoIdToGridXy", new GeoIdToGridXyUDF)
+    sparkSession.udf.register("GeoIdToLatLng", new GeoIdToLatLngUDF)
+    sparkSession.udf.register("LatLngToGeoId", new LatLngToGeoIdUDF)
+    sparkSession.udf.register("ToUpperLayerGeoId", new ToUpperLayerGeoIdUDF)
+    sparkSession.udf.register("ToRangeList", new ToRangeListUDF)
+  }
+}
+
+class GeoIdToGridXyUDF extends (Long => Array[Int]) with Serializable {
+  override def apply(geoId: Long): Array[Int] = {
+    GeoHashUtils.geoID2ColRow(geoId)
+  }
+}
+
+class GeoIdToLatLngUDF extends ((Long, Double, Int) => Array[Double]) with 
Serializable {
+  override def apply(geoId: Long, oriLatitude: Double, gridSize: Int): 
Array[Double] = {
+    GeoHashUtils.geoID2LatLng(geoId, oriLatitude, gridSize)
+  }
+}
+
+class LatLngToGeoIdUDF extends ((Long, Long, Double, Int) => Long) with 
Serializable {
+  override def apply(latitude: Long, longitude: Long, oriLatitude: Double, 
gridSize: Int): Long = {
+    GeoHashUtils.lonLat2GeoID(longitude, latitude, oriLatitude, gridSize)
+  }
+}
+
+class ToUpperLayerGeoIdUDF extends (Long => Long) with Serializable {
+  override def apply(geoId: Long): Long = {
+    GeoHashUtils.convertToUpperLayerGeoId(geoId)
+  }
+}
+
+class ToRangeListUDF extends ((String, Double, Int) => 
mutable.Buffer[Array[Long]])
+  with Serializable {
+  override def apply(polygon: String, oriLatitude: Double,
+                     gridSize: Int): mutable.Buffer[Array[Long]] = {
+    GeoHashUtils.getRangeList(polygon, oriLatitude, 
gridSize).asScala.map(_.map(Long2long))
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index b73bf04..b423978 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -17,10 +17,21 @@
 
 package org.apache.carbondata.geo
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.sources.Filter
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 
+object GeoFilterUDFs {
+
+  def registerUDFs(sparkSession: SparkSession): Unit = {
+    sparkSession.udf.register("in_polygon", new InPolygonUDF)
+    sparkSession.udf.register("in_polygon_list", new InPolygonListUDF)
+    sparkSession.udf.register("in_polyline_list", new InPolylineListUDF)
+    sparkSession.udf.register("in_polygon_range_list", new 
InPolygonRangeListUDF)
+  }
+}
+
 @InterfaceAudience.Internal
 class InPolygonUDF extends (String => Boolean) with Serializable {
   override def apply(v1: String): Boolean = {
@@ -29,7 +40,42 @@ class InPolygonUDF extends (String => Boolean) with 
Serializable {
 }
 
 @InterfaceAudience.Internal
+class InPolygonListUDF extends ((String, String) => Boolean) with Serializable 
{
+  override def apply(v1: String, v2: String): Boolean = {
+    true // Carbon applies the filter. So, Spark do not have to apply filter.
+  }
+}
+
[email protected]
+class InPolylineListUDF extends ((String, Float) => Boolean) with Serializable 
{
+  override def apply(v1: String, v2: Float): Boolean = {
+    true // Carbon applies the filter. So, Spark do not have to apply filter.
+  }
+}
+
[email protected]
+class InPolygonRangeListUDF extends ((String, String) => Boolean) with 
Serializable {
+  override def apply(v1: String, v2: String): Boolean = {
+    true // Carbon applies the filter. So, Spark do not have to apply filter.
+  }
+}
+
[email protected]
 case class InPolygon(queryString: String) extends Filter {
   override def references: Array[String] = Array()
 }
 
[email protected]
+case class InPolygonList(polygonListString: String, opType: String) extends 
Filter {
+  override def references: Array[String] = Array()
+}
+
[email protected]
+case class InPolylineList(polylineListString: String, buffer: String) extends 
Filter {
+  override def references: Array[String] = Array()
+}
+
[email protected]
+case class InPolygonRangeList(rangeListString: String, opType: String) extends 
Filter {
+  override def references: Array[String] = Array()
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 48d8fd9..071c841 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -39,7 +39,7 @@ import 
org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.events._
-import org.apache.carbondata.geo.InPolygonUDF
+import org.apache.carbondata.geo.GeoUdfRegister
 import org.apache.carbondata.index.{TextMatchMaxDocUDF, TextMatchUDF}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, 
LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
@@ -95,7 +95,12 @@ class CarbonEnv {
     // TODO: move it to proper place, it should be registered by indexSchema 
implementation
     sparkSession.udf.register("text_match", new TextMatchUDF)
     sparkSession.udf.register("text_match_with_limit", new TextMatchMaxDocUDF)
-    sparkSession.udf.register("in_polygon", new InPolygonUDF)
+
+    // register udf for spatial index filters of querying
+    GeoUdfRegister.registerQueryFilterUdf(sparkSession)
+
+    // register udf for spatial index utils
+    GeoUdfRegister.registerUtilUdf(sparkSession)
 
     // register udf for materialized view
     sparkSession.udf.register(MVFunctions.DUMMY_FUNCTION, () => "")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b062e9..a3915bf 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -56,7 +56,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.geo.{InPolygon, InPolygonUDF}
+import org.apache.carbondata.geo.{InPolygon, InPolygonList, InPolygonListUDF, 
InPolygonRangeList, InPolygonRangeListUDF, InPolygonUDF, InPolylineList, 
InPolylineListUDF}
 import org.apache.carbondata.index.{TextMatch, TextMatchLimit, 
TextMatchMaxDocUDF, TextMatchUDF}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -427,7 +427,10 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
     // level filter at carbon and return the rows directly.
     if (candidatePredicates
       .exists(exp => exp.isInstanceOf[ScalaUDF] &&
-                     
exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonUDF])) {
+        (exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonUDF] ||
+          exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonListUDF] ||
+          exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolylineListUDF] 
||
+          
exp.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonRangeListUDF]))) {
       vectorPushRowFilters = true
     }
 
@@ -825,6 +828,24 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
         }
         Some(InPolygon(u.children.head.toString()))
 
+      case u: ScalaUDF if u.function.isInstanceOf[InPolygonListUDF] =>
+        if (u.children.size != 2) {
+          throw new MalformedCarbonCommandException("Expect two string in 
polygon list")
+        }
+        Some(InPolygonList(u.children.head.toString(), 
u.children.last.toString()))
+
+      case u: ScalaUDF if u.function.isInstanceOf[InPolylineListUDF] =>
+        if (u.children.size != 2) {
+          throw new MalformedCarbonCommandException("Expect two string in 
polyline list")
+        }
+        Some(InPolylineList(u.children.head.toString(), 
u.children.last.toString()))
+
+      case u: ScalaUDF if u.function.isInstanceOf[InPolygonRangeListUDF] =>
+        if (u.children.size != 2) {
+          throw new MalformedCarbonCommandException("Expect two string in 
polygon range list")
+        }
+        Some(InPolygonRangeList(u.children.head.toString(), 
u.children.last.toString()))
+
       case or@Or(left, right) =>
 
         val leftFilter = translateFilter(left, true)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index f4e89f3..c51d3d7 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -42,8 +42,11 @@ import 
org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, 
FalseExpression, OrExpression}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.geo.{GeoUtils, InPolygon}
+import org.apache.carbondata.geo.{GeoUtils, InPolygon, InPolygonList, 
InPolygonRangeList, InPolylineList}
 import org.apache.carbondata.geo.scan.expression.{PolygonExpression => 
CarbonPolygonExpression}
+import org.apache.carbondata.geo.scan.expression.{PolygonListExpression => 
CarbonPolygonListExpression}
+import org.apache.carbondata.geo.scan.expression.{PolylineListExpression => 
CarbonPolylineListExpression}
+import org.apache.carbondata.geo.scan.expression.{PolygonRangeListExpression 
=> CarbonPolygonRangeListExpression}
 import org.apache.carbondata.index.{TextMatch, TextMatchLimit}
 
 /**
@@ -141,6 +144,16 @@ object CarbonFilters {
         case InPolygon(queryString) =>
           val (columnName, instance) = 
GeoUtils.getGeoHashHandler(tableProperties)
           Some(new CarbonPolygonExpression(queryString, columnName, instance))
+        case InPolygonList(polygonListString, opType) =>
+          val (columnName, instance) = 
GeoUtils.getGeoHashHandler(tableProperties)
+          Some(new CarbonPolygonListExpression(polygonListString, opType, 
columnName, instance))
+        case InPolylineList(polylineListString, buffer) =>
+          val (columnName, instance) = 
GeoUtils.getGeoHashHandler(tableProperties)
+          Some(new CarbonPolylineListExpression(polylineListString, 
buffer.toFloat,
+              columnName, instance))
+        case InPolygonRangeList(rangeListString, opType) =>
+          val (columnName, instance) = 
GeoUtils.getGeoHashHandler(tableProperties)
+          Some(new CarbonPolygonRangeListExpression(rangeListString, opType, 
columnName, instance))
         case _ => None
       }
     }
diff --git a/integration/spark/src/test/resources/geodata2.csv 
b/integration/spark/src/test/resources/geodata2.csv
new file mode 100644
index 0000000..3e6dce2
--- /dev/null
+++ b/integration/spark/src/test/resources/geodata2.csv
@@ -0,0 +1,31 @@
+timevalue,longitude,latitude
+1575428400000,120177080,30326882
+1575428400000,120180685,30326327
+1575428400000,120184976,30327105
+1575428400000,120189311,30327549
+1575428400000,120194460,30329698
+1575428400000,120186965,30329133
+1575428400000,120177481,30328911
+1575428400000,120169713,30325614
+1575428400000,120164563,30322243
+1575428400000,120171558,30319613
+1575428400000,120176365,30320687
+1575428400000,120179669,30323688
+1575428400000,120181001,30320761
+1575428400000,120187094,30323540
+1575428400000,120193574,30323651
+1575428400000,120186192,30320132
+1575428400000,120190055,30317464
+1575428400000,120195376,30318094
+1575428400000,120160786,30317094
+1575428400000,120168211,30318057
+1575428400000,120173618,30316612
+1575428400000,120181001,30317316
+1575428400000,120185162,30315908
+1575428400000,120192415,30315871
+1575428400000,120161902,30325614
+1575428400000,120164306,30328096
+1575428400000,120197093,30325985
+1575428400000,120196020,30321651
+1575428400000,120198638,30323540
+1575428400000,120165421,30314834
\ No newline at end of file
diff --git 
a/integration/spark/src/test/resources/geodataWithCorrectSpatialIndex.csv 
b/integration/spark/src/test/resources/geodataWithCorrectSpatialIndex.csv
new file mode 100644
index 0000000..e1e68ef
--- /dev/null
+++ b/integration/spark/src/test/resources/geodataWithCorrectSpatialIndex.csv
@@ -0,0 +1,17 @@
+mygeohash,timevalue,longitude,latitude
+855280799612,1575428400000,116285807,40084087
+855283635086,1575428400000,116372142,40129503
+855279346102,1575428400000,116187332,39979316
+855282156308,1575428400000,116337069,39951887
+855283640154,1575428400000,116359102,40154684
+855282440834,1575428400000,116736367,39970323
+855282468370,1575428400000,116720179,40009893
+855283633205,1575428400000,116346961,40133550
+855279270226,1575428400000,116302895,39930753
+855279368850,1575428400000,116288955,39999101
+855280812709,1575428400000,116176090,40129953
+855282443862,1575428400000,116725575,39981115
+855280927196,1575428400000,116266922,40179415
+855283640110,1575428400000,116353706,40156483
+855282072206,1575428400000,116362699,39942444
+855282157702,1575428400000,116325378,39963129
diff --git 
a/integration/spark/src/test/resources/geodataWithErrorSpatialIndex.csv 
b/integration/spark/src/test/resources/geodataWithErrorSpatialIndex.csv
new file mode 100644
index 0000000..8d411b2
--- /dev/null
+++ b/integration/spark/src/test/resources/geodataWithErrorSpatialIndex.csv
@@ -0,0 +1,17 @@
+mygeohash,timevalue,longitude,latitude
+0,1575428400000,116285807,40084087
+0,1575428400000,116372142,40129503
+0,1575428400000,116187332,39979316
+0,1575428400000,116337069,39951887
+0,1575428400000,116359102,40154684
+0,1575428400000,116736367,39970323
+0,1575428400000,116720179,40009893
+0,1575428400000,116346961,40133550
+0,1575428400000,116302895,39930753
+0,1575428400000,116288955,39999101
+0,1575428400000,116176090,40129953
+0,1575428400000,116725575,39981115
+0,1575428400000,116266922,40179415
+0,1575428400000,116353706,40156483
+0,1575428400000,116362699,39942444
+0,1575428400000,116325378,39963129
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala 
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index fa383f3..2ea56db 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.geo
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -200,10 +202,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
          | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
          | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
          | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
-         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
-         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
-         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
          | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
     val descTable = sql(s"describe formatted $table1").collect
@@ -235,8 +233,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
   test("test geo table filter by geo spatial index column") {
     createTable()
     loadData()
-    checkAnswer(sql(s"select *from $table1 where mygeohash = '2196036'"),
-      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+    checkAnswer(sql(s"select *from $table1 where mygeohash = '855282156308'"),
+      Seq(Row(855282156308L, 1575428400000L, 116337069, 39951887)))
   }
 
   test("test polygon query") {
@@ -276,17 +274,14 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
          | 'SPATIAL_INDEX.spatial.sourcecolumns'='longitude, latitude',
          | 'SPATIAL_INDEX.spatial.originLatitude'='39.832277',
          | 'SPATIAL_INDEX.spatial.gridSize'='60',
-         | 'SPATIAL_INDEX.spatial.minLongitude'='115.811865',
-         | 'SPATIAL_INDEX.spatial.maxLongitude'='116.782233',
-         | 'SPATIAL_INDEX.spatial.minLatitude'='39.832277',
-         | 'SPATIAL_INDEX.spatial.maxLatitude'='40.225281',
          | 'SPATIAL_INDEX.spatial.conversionRatio'='1000000')
        """.stripMargin)
     loadData(sourceTable)
     createTable(targetTable)
+    // INSERT INTO will keep SPATIAL_INDEX column from sourceTable instead of 
generating internally
     sql(s"insert into  $targetTable select * from $sourceTable")
-    checkAnswer(sql(s"select *from $targetTable where mygeohash = '2196036'"),
-      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+    checkAnswer(sql(s"select *from $targetTable where mygeohash = 
'233137655761'"),
+      Seq(Row(233137655761L, 1575428400000L, 116337069, 39951887)))
   }
 
   test("test insert into non-geo table select from geo table") {
@@ -302,8 +297,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
         """)
     sql(s"insert into  $targetTable select * from $sourceTable")
     checkAnswer(
-      sql(s"select * from $targetTable where spatial='2196036'"),
-      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+      sql(s"select * from $targetTable where spatial='855282156308'"),
+      Seq(Row(855282156308L, 1575428400000L, 116337069, 39951887)))
   }
 
   test("test insert into table select from another table with target table 
sort scope as global") {
@@ -321,25 +316,25 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
 
   test("test block pruning for polygon query") {
     createTable()
-    sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
-    sql(s"insert into $table1 select 0,1575428400000,116372142,40129503")
-    sql(s"insert into $table1 select 0,1575428400000,116187332,39979316")
-    sql(s"insert into $table1 select 0,1575428400000,116337069,39951887")
-    sql(s"insert into $table1 select 0,1575428400000,116359102,40154684")
-    sql(s"insert into $table1 select 0,1575428400000,116736367,39970323")
-    sql(s"insert into $table1 select 0,1575428400000,116362699,39942444")
-    sql(s"insert into $table1 select 0,1575428400000,116325378,39963129")
-    sql(s"insert into $table1 select 0,1575428400000,116302895,39930753")
-    sql(s"insert into $table1 select 0,1575428400000,116288955,39999101")
+    sql(s"insert into $table1 select 
855280799612,1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 
855283635086,1575428400000,116372142,40129503")
+    sql(s"insert into $table1 select 
855279346102,1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 
855282156308,1575428400000,116337069,39951887")
+    sql(s"insert into $table1 select 
855283640154,1575428400000,116359102,40154684")
+    sql(s"insert into $table1 select 
855282440834,1575428400000,116736367,39970323")
+    sql(s"insert into $table1 select 
855282072206,1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 
855282157702,1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 
855279270226,1575428400000,116302895,39930753")
+    sql(s"insert into $table1 select 
855279368850,1575428400000,116288955,39999101")
     val df = sql(s"select * from $table1 where IN_POLYGON('116.321011 " +
                  s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 
116.321011 40.123503')")
     assert(df.rdd.getNumPartitions == 6)
-    checkAnswer(df, Seq(Row(733215, 1575428400000L, 116187332, 39979316),
-      Row(2160019, 1575428400000L, 116362699, 39942444),
-      Row(2170151, 1575428400000L, 116288955, 39999101),
-      Row(2174509, 1575428400000L, 116325378, 39963129),
-      Row(2196036, 1575428400000L, 116337069, 39951887),
-      Row(2361256, 1575428400000L, 116285807, 40084087)))
+    checkAnswer(df, Seq(Row(855279346102L, 1575428400000L, 116187332, 
39979316),
+      Row(855282072206L, 1575428400000L, 116362699, 39942444),
+      Row(855279368850L, 1575428400000L, 116288955, 39999101),
+      Row(855282157702L, 1575428400000L, 116325378, 39963129),
+      Row(855282156308L, 1575428400000L, 116337069, 39951887),
+      Row(855280799612L, 1575428400000L, 116285807, 40084087)))
   }
 
   test("test insert into on table partitioned by timevalue column") {
@@ -354,16 +349,12 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
          | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
          | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
          | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
-         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
-         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
-         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
          | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
     sql(s"insert into $table1 select 0, 116337069, 39951887, 1575428400000")
     checkAnswer(
-      sql(s"select * from $table1 where mygeohash = '2196036'"),
-      Seq(Row(2196036, 116337069, 39951887, 1575428400000L)))
+      sql(s"select * from $table1 where mygeohash = '0'"),
+      Seq(Row(0, 116337069, 39951887, 1575428400000L)))
   }
 
   test("test polygon query on table partitioned by timevalue column") {
@@ -377,10 +368,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
            | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
            | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
            | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-           | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
-           | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
-           | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
-           | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
            | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
     loadData()
@@ -390,6 +377,443 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
       result)
   }
 
+  test("test insert into geo table with customized spatial index and polygon 
query") {
+    createTable()
+    sql(s"insert into $table1 select 
855279346102,1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 
855282072206,1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 
855279368850,1575428400000,116288955,39999101")
+    sql(s"insert into $table1 select 
855282157702,1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 
855280799612,1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 0, 1575428400000, 116337069, 39951887")
+    checkAnswer(
+      sql(s"select * from $table1 where longitude = '116337069'"),
+      Seq(Row(0, 1575428400000L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 40.123503, " +
+        s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq(Row(116187332, 39979316),
+        Row(116362699, 39942444),
+        Row(116288955, 39999101),
+        Row(116325378, 39963129),
+        Row(116285807, 40084087)))
+  }
+
+  test("test load data with customized correct spatial index to geo table and 
polygon query") {
+    createTable()
+    sql(s"""LOAD DATA local inpath 
'$resourcesPath/geodataWithCorrectSpatialIndex.csv'
+           |INTO TABLE $table1 OPTIONS ('DELIMITER'= ',')""".stripMargin)
+    checkAnswer(
+      sql(s"select * from $table1 where longitude = '116337069'"),
+      Seq(Row(855282156308L, 1575428400000L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 40.123503, " +
+        s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      result)
+  }
+
+  test("test load data with customized error spatial index to geo table and 
polygon query") {
+    createTable()
+    sql(s"""LOAD DATA local inpath 
'$resourcesPath/geodataWithErrorSpatialIndex.csv'
+           |INTO TABLE $table1 OPTIONS ('DELIMITER'= ',')""".stripMargin)
+    checkAnswer(
+      sql(s"select * from $table1 where longitude = '116337069'"),
+      Seq(Row(0, 1575428400000L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 40.123503, " +
+        s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq())
+  }
+
+  test("test polygon list query: union of two polygons which are intersected") 
{
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+      Seq(Row(120177080, 30326882),
+        Row(120180685, 30326327),
+        Row(120184976, 30327105),
+        Row(120176365, 30320687),
+        Row(120179669, 30323688),
+        Row(120181001, 30320761),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132),
+        Row(120181001, 30317316)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
+      Seq(Row(120184976, 30327105),
+        Row(120189311, 30327549),
+        Row(120187094, 30323540),
+        Row(120193574, 30323651),
+        Row(120186192, 30320132),
+        Row(120190055, 30317464),
+        Row(120196020, 30321651)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
+        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+        s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 
30.321464," +
+        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " 
+
+        s"'OR')"),
+      Seq(Row(120177080, 30326882),
+        Row(120180685, 30326327),
+        Row(120184976, 30327105),
+        Row(120176365, 30320687),
+        Row(120179669, 30323688),
+        Row(120181001, 30320761),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132),
+        Row(120181001, 30317316),
+        Row(120189311, 30327549),
+        Row(120193574, 30323651),
+        Row(120190055, 30317464),
+        Row(120196020, 30321651)))
+  }
+
+  test("test polygon list query: intersection of two polygons which are 
intersected") {
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+      Seq(Row(120177080, 30326882),
+        Row(120180685, 30326327),
+        Row(120184976, 30327105),
+        Row(120176365, 30320687),
+        Row(120179669, 30323688),
+        Row(120181001, 30320761),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132),
+        Row(120181001, 30317316)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
+      Seq(Row(120184976, 30327105),
+        Row(120189311, 30327549),
+        Row(120187094, 30323540),
+        Row(120193574, 30323651),
+        Row(120186192, 30320132),
+        Row(120190055, 30317464),
+        Row(120196020, 30321651)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
+        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+        s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 
30.321464," +
+        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " 
+
+        s"'AND')"),
+      Seq(Row(120184976, 30327105),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132)))
+  }
+
+  test("test polygon list query: intersection of two polygons which are not 
intersected") {
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+      Seq(Row(120177080, 30326882),
+        Row(120180685, 30326327),
+        Row(120184976, 30327105),
+        Row(120176365, 30320687),
+        Row(120179669, 30323688),
+        Row(120181001, 30320761),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132),
+        Row(120181001, 30317316)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.164492 30.326279,120.160629 30.318870,120.172259 
30.315351,120.164492 30.326279')"),
+      Seq(Row(120164563, 30322243),
+        Row(120168211, 30318057)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
+        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+        s"POLYGON ((120.164492 30.326279,120.160629 30.318870,120.172259 
30.315351," +
+        s"120.164492 30.326279))', " +
+        s"'AND')"),
+      Seq())
+  }
+
+  test("test polygon list query: intersection of two polygons when second 
polygon " +
+    "is completely in first polygon") {
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+      Seq(Row(120177080, 30326882),
+        Row(120180685, 30326327),
+        Row(120184976, 30327105),
+        Row(120176365, 30320687),
+        Row(120179669, 30323688),
+        Row(120181001, 30320761),
+        Row(120187094, 30323540),
+        Row(120186192, 30320132),
+        Row(120181001, 30317316)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
+        s"120.179442 30.325205,120.177253 30.322242,120.180944 30.319426," +
+        s"120.186094 30.321834,120.179442 30.325205')"),
+      Seq(Row(120179669, 30323688),
+        Row(120181001, 30320761)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
+        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 
30.314540," +
+        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+        s"POLYGON ((120.179442 30.325205,120.177253 30.322242,120.180944 
30.319426," +
+        s"120.186094 30.321834,120.179442 30.325205))', " +
+        s"'AND')"),
+      Seq(Row(120179669, 30323688),
+        Row(120181001, 30320761)))
+  }
+
+  test("test one polyline query") {
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 
30.324464, " +
+        s"120.190359 30.315388)', 65)"),
+      Seq(Row(120184976, 30327105),
+        Row(120197093, 30325985),
+        Row(120196020, 30321651),
+        Row(120198638, 30323540)))
+  }
+
+  test("test polyline list query, result is union of two polylines") {
+    createTable()
+    loadData2()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 
30.324464)', 65)"),
+      Seq(Row(120184976, 30327105),
+        Row(120197093, 30325985)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+        s"'LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+      Seq(Row(120196020, 30321651),
+        Row(120198638, 30323540)))
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 
30.324464), " +
+        s"LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+      Seq(Row(120184976, 30327105),
+        Row(120197093, 30325985),
+        Row(120196020, 30321651),
+        Row(120198638, 30323540)))
+  }
+
+  test("test one range list query which have no overlapping range") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368850, 855280799610 855280799612)', 
'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087)))
+  }
+
+  test("test one range list query which have overlapping range") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368850, 855279368849 855279368852)', 
'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101)))
+  }
+
+  test("test one range list query when one range contains another range") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368856, 855279368849 855279368852)', 
'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101)))
+  }
+
+  test("test two range lists query: union of two range lists which are 
intersected") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087),
+        Row(855282156308L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157800)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855282156308L, 116337069, 39951887),
+        Row(855282157702L, 116325378, 39963129)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400), " +
+        s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157800)', " +
+        s"'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087),
+        Row(855282156308L, 116337069, 39951887),
+        Row(855282157702L, 116325378, 39963129)))
+  }
+
+  test("test two range lists query: intersection of two range lists which are 
intersected") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087),
+        Row(855282156308L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157800)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855282156308L, 116337069, 39951887),
+        Row(855282157702L, 116325378, 39963129)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400), " +
+        s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157800)', " +
+        s"'AND')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855282156308L, 116337069, 39951887)))
+  }
+
+  test("test two range lists query: intersection of two range lists which are 
not intersected") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087),
+        Row(855282156308L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368830 855279368840, 855280799613 855280799615, " +
+        s"855282157700 855282157800)', 'OR')"),
+      Seq(Row(855282157702L, 116325378, 39963129)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400), " +
+        s"RANGELIST (855279368830 855279368840, 855280799613 855280799615, " +
+        s"855282157700 855282157800)', " +
+        s"'AND')"),
+      Seq())
+  }
+
+  test("test two range lists query: intersection of two range lists when 
second range list " +
+      "is completely in first range list") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+        s"855282156300 855282157400)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855280799612L, 116285807, 40084087),
+        Row(855282156308L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157000)', 'OR')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855282156308L, 116337069, 39951887)))
+    checkAnswer(
+      sql(s"select mygeohash, longitude, latitude from $table1 where 
IN_POLYGON_RANGE_LIST(" +
+        s"'RANGELIST (855279368840 855279368852, 855280799610 855280799620, " +
+        s"855282156300 855282157400), " +
+        s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+        s"855282156301 855282157000)', " +
+        s"'AND')"),
+      Seq(Row(855279368850L, 116288955, 39999101),
+        Row(855282156308L, 116337069, 39951887)))
+  }
+
+  test("test transforming GeoId to GridXY") {
+    checkAnswer(
+      sql(s"select GeoIdToGridXy(855279270226) as GridXY"),
+      Seq(Row(Seq(613089, 722908))))
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude, mygeohash, GeoIdToGridXy(mygeohash) as 
GridXY " +
+        s"from $table1 where mygeohash = 855279270226"),
+      Seq(Row(116302895, 39930753, 855279270226L, Seq(613089, 722908))))
+  }
+
+  test("test transforming latitude and longitude to GeoId") {
+    checkAnswer(
+      sql(s"select LatLngToGeoId(39930753, 116302895, 39.832277, 50) as 
geoId"),
+      Seq(Row(855279270226L)))
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude, mygeohash, " +
+        s"LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId " +
+        s"from $table1 where mygeohash = 855279270226"),
+      Seq(Row(116302895, 39930753, 855279270226L, 855279270226L)))
+  }
+
+  test("test transforming GeoId to latitude and longitude") {
+    checkAnswer(
+      sql(s"select GeoIdToLatLng(855279270226, 39.832277, 50) as 
LatitudeAndLongitude"),
+      Seq(Row(Seq(39.930529, 116.303093))))
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude, mygeohash, " +
+        s"GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude " +
+        s"from $table1 where mygeohash = 855279270226"),
+      Seq(Row(116302895, 39930753, 855279270226L, Seq(39.930529, 116.303093))))
+  }
+
+  test("test transforming to upper layer geoId") {
+    checkAnswer(
+      sql(s"select ToUpperLayerGeoId(855279270226) as upperLayerGeoId"),
+      Seq(Row(213819817556L)))
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude, mygeohash, " +
+        s"ToUpperLayerGeoId(mygeohash) as upperLayerGeoId " +
+        s"from $table1 where mygeohash = 855279270226"),
+      Seq(Row(116302895, 39930753, 855279270226L, 213819817556L)))
+  }
+
+  test("test transforming polygon string to rangeList") {
+    checkAnswer(
+      sql(s"select ToRangeList('116.321011 40.123503, 116.320311 40.122503, " +
+        s"116.321111 40.121503, 116.321011 40.123503', 39.832277, 50) as 
rangeList"),
+      Seq(Row(mutable.WrappedArray.make(Array(
+        mutable.WrappedArray.make(Array(855280833998L, 855280833998L)),
+        mutable.WrappedArray.make(Array(855280834020L, 855280834020L)),
+        mutable.WrappedArray.make(Array(855280834022L, 855280834022L))))))
+    )
+  }
+
   override def afterEach(): Unit = {
     drop()
   }
@@ -414,10 +838,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
            | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
            | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
            | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
-           | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
-           | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
-           | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
-           | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
            | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
   }
@@ -426,5 +846,9 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
     sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE 
$tableName OPTIONS
            |('DELIMITER'= ',')""".stripMargin)
   }
-}
 
+  def loadData2(tableName : String = table1): Unit = {
+    sql(s"""LOAD DATA local inpath '$resourcesPath/geodata2.csv' INTO TABLE 
$tableName OPTIONS
+           |('DELIMITER'= ',')""".stripMargin)
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
index 6f05930..0ca5aeb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
@@ -95,7 +95,9 @@ public class SpatialIndexFieldConverterImpl extends 
MeasureFieldConverterImpl {
   @Override
   public void convert(CarbonRow row, BadRecordLogHolder logHolder)
       throws CarbonDataLoadingException {
-    row.update(generateIndexValue(row), index);
+    if (row.getData()[index] == null) {
+      row.update(generateIndexValue(row), index);
+    }
     super.convert(row, logHolder);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 23caf31..7ea962a 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.loading.parser.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -81,7 +82,8 @@ public class RowParserImpl implements RowParser {
     int k = 0;
     for (int i = 0; i < fields.length; i++) {
       if (spatialProperty != null && fields[i].getColumn().getColName()
-          .equalsIgnoreCase(spatialProperty.trim())) {
+          .equalsIgnoreCase(spatialProperty.trim()) &&
+          !Arrays.asList(header).contains(spatialProperty.trim())) {
         // Spatial index columns are not present in the header. So set
         // the input mapping as -1 for the field and continue
         input[k] = fields[i];
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index c8fa656..243a105 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -415,16 +415,8 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
     private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] 
data,
         DataField[] dataFields) {
       Object[] newData = new Object[dataFields.length];
-      Map<String, String> properties =
-          
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
-              .getTableProperties();
-      String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
       // now dictionary is removed, no need of no dictionary mapping
       for (int i = 0; i < dataFields.length; i++) {
-        if (spatialProperty != null && dataFields[i].getColumn().getColName()
-            .equalsIgnoreCase(spatialProperty.trim())) {
-          continue;
-        }
         if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
           // keep the no dictionary measure column as original data
           newData[i] = data[i];

Reply via email to