This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3bc4d  [CARBONDATA-3833] Make geoID visible
7c3bc4d is described below

commit 7c3bc4d130dd1a7ae84f3b8b0f7b144138ef442b
Author: Shreelekhya <[email protected]>
AuthorDate: Tue May 26 23:06:01 2020 +0530

    [CARBONDATA-3833] Make geoID visible
    
    Why is this PR needed?
    To make geohash column visible to the user
    
    What changes were proposed in this PR?
    Generated geohash column is included in the schema. Validation added to 
avoid alter and drop the geohash column. Indexes, MV and other table properties 
not supported on this column.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3774
---
 .../ThriftWrapperSchemaConverterImpl.java          |   2 -
 .../core/metadata/schema/table/CarbonTable.java    |   4 +-
 .../metadata/schema/table/column/CarbonColumn.java |   8 -
 .../metadata/schema/table/column/ColumnSchema.java |  24 ---
 .../apache/carbondata/core/util/CarbonUtil.java    |   1 -
 .../ThriftWrapperSchemaConverterImplTest.java      |   5 -
 format/src/main/thrift/schema.thrift               |   5 -
 .../org/apache/carbondata/spark/util/Util.java     |   2 +-
 .../spark/load/DataLoadProcessorStepOnSpark.scala  |  10 -
 .../spark/rdd/CarbonDataRDDFactory.scala           |   2 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala    |   8 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  24 +++
 .../spark/sql/catalyst/CarbonParserUtil.scala      |   1 +
 .../command/carbonTableSchemaCommon.scala          |   1 -
 .../command/index/CarbonCreateIndexCommand.scala   |   8 +
 .../management/CarbonInsertIntoCommand.scala       |  27 +--
 .../command/management/CommonLoadUtils.scala       |   2 +-
 .../mutation/CarbonProjectForUpdateCommand.scala   |   4 +
 .../schema/CarbonAlterTableAddColumnCommand.scala  |   3 -
 ...nAlterTableColRenameDataTypeChangeCommand.scala |   4 +-
 .../schema/CarbonAlterTableDropColumnCommand.scala |   3 +-
 .../command/view/CarbonCreateMVCommand.scala       |  15 ++
 .../spark/sql/hive/CarbonFileMetastore.scala       |  13 +-
 .../org/apache/spark/sql/hive/CarbonRelation.scala |   6 +-
 .../secondaryindex/command/SICreationCommand.scala |   8 +
 .../org/apache/spark/util/AlterTableUtil.scala     |  32 ++--
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 206 +++++++++++++++++++--
 .../converter/impl/FieldEncoderFactory.java        |  12 +-
 .../loading/converter/impl/RowConverterImpl.java   |  29 ++-
 .../loading/parser/impl/RowParserImpl.java         |  11 +-
 .../InputProcessorStepWithNoConverterImpl.java     |  27 ++-
 .../processing/util/CarbonDataProcessorUtil.java   |   6 +-
 32 files changed, 371 insertions(+), 142 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 80fb2e1..255fa6f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -207,7 +207,6 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     thriftColumnSchema.setInvisible(wrapperColumnSchema.isInvisible());
     
thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     
thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
-    thriftColumnSchema.setSpatialColumn(wrapperColumnSchema.isSpatialColumn());
     if (wrapperColumnSchema.isSortColumn()) {
       Map<String, String> properties = 
wrapperColumnSchema.getColumnProperties();
       if (null == properties) {
@@ -506,7 +505,6 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     wrapperColumnSchema.setInvisible(externalColumnSchema.isInvisible());
     
wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     
wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    
wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     wrapperColumnSchema.setSortColumn(false);
     Map<String, String> properties = 
externalColumnSchema.getColumnProperties();
     if (properties != null) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 81ce39a..cbaf18a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -300,9 +300,7 @@ public class CarbonTable implements Serializable, Writable {
   private void fillCreateOrderColumn() {
     List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
     for (CarbonDimension dimension : visibleDimensions) {
-      if (!dimension.getColumnSchema().isSpatialColumn()) {
-        columns.add(dimension);
-      }
+      columns.add(dimension);
     }
     columns.addAll(visibleMeasures);
     columns.sort(new Comparator<CarbonColumn>() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index ae2775a..40015a3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -167,14 +167,6 @@ public class CarbonColumn implements Serializable {
   }
 
   /**
-   * Checks if it is spatial index column
-   * @return Returns True if the column is an spatial index column. Otherwise 
returns False.
-   */
-  public boolean isSpatialColumn() {
-    return columnSchema.isSpatialColumn();
-  }
-
-  /**
    * @return column property map
    */
   public Map<String, String> getColumnProperties() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index dd4b4a0..88b55ce 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -122,11 +122,6 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
   private boolean isSortColumn = false;
 
   /**
-   *  Whether it is a spatial index column
-   */
-  private boolean spatialColumn = false;
-
-  /**
    * aggregate function used in pre aggregate table
    */
   private String aggFunction = "";
@@ -538,7 +533,6 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       }
     }
     out.writeBoolean(isLocalDictColumn);
-    out.writeBoolean(spatialColumn);
   }
 
   @Override
@@ -588,7 +582,6 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       }
     }
     this.isLocalDictColumn = in.readBoolean();
-    this.spatialColumn = in.readBoolean();
   }
 
   /**
@@ -612,21 +605,4 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       throw new RuntimeException("Error occur while cloning ColumnSchema", e);
     }
   }
-
-  /**
-   * Checks whether it is a spatial index column.
-   * @return Returns True if the column is a spatial index column. Otherwise 
returns False.
-   */
-  public boolean isSpatialColumn() {
-    return spatialColumn;
-  }
-
-  /**
-   * Set the column spatial index property. True or False to indicate column 
is a spatial index
-   * column or not respectively.
-   * @param spatialColumn True or False
-   */
-  public void setSpatialColumn(boolean spatialColumn) {
-    this.spatialColumn = spatialColumn;
-  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index fa77fc4..12d96e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1966,7 +1966,6 @@ public final class CarbonUtil {
     wrapperColumnSchema.setScale(externalColumnSchema.getScale());
     
wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
     
wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    
wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     Map<String, String> properties = 
externalColumnSchema.getColumnProperties();
     if (properties != null) {
       if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index fadc2ad..236c3fc 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -158,11 +158,6 @@ public class ThriftWrapperSchemaConverterImplTest {
         return thriftColumnSchema;
       }
 
-      @Mock public org.apache.carbondata.format.ColumnSchema setSpatialColumn(
-          boolean spatialColumn) {
-        return thriftColumnSchema;
-      }
-
       @Mock public String getColumn_id() {
         return "1";
       }
diff --git a/format/src/main/thrift/schema.thrift 
b/format/src/main/thrift/schema.thrift
index b7fabef..fc3a2ff 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -133,11 +133,6 @@ struct ColumnSchema{
   **/
   /** Deprecated */
        17: optional list<ParentColumnTableRelation> parentColumnTableRelations;
-
-  /**
-   * To specify if it is a spatial index column. Its Default value is false
-        */
-       18: optional bool spatialColumn;
 }
 
 /**
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java 
b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
index 0fce43a..30c2b5f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -111,7 +111,7 @@ public class Util {
     List<ColumnSchema> columns = 
table.getTableInfo().getFactTable().getListOfColumns();
     List<ColumnSchema> validColumnSchema = new ArrayList<>();
     for (ColumnSchema column : columns) {
-      if (!column.isInvisible() && !column.isSpatialColumn() && 
!column.isComplexColumn()) {
+      if (!column.isInvisible() && !column.isComplexColumn()) {
         validColumnSchema.add(column);
       }
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 05b9ac3..6851959 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -209,16 +209,6 @@ object DataLoadProcessorStepOnSpark {
           row = new CarbonRow(rowParser.parseRow(rows.next()))
         }
         row = rowConverter.convert(row)
-        if (row != null) {
-          // In case of partition, after Input processor and converter steps, 
all the rows are given
-          // to hive to create partition folders. As hive is unaware of 
non-schema columns,
-          // should discard those columns from rows and return.
-          val schemaColumnValues = row.getData.zipWithIndex.collect {
-            case (data, index) if 
!conf.getDataFields()(index).getColumn.isSpatialColumn =>
-              data
-          }
-          row.setData(schemaColumnValues)
-        }
         rowCounter.add(1)
         row
       }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 540de75..98e8152 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -375,7 +375,7 @@ object CarbonDataRDDFactory {
               .getFactTable
               .getListOfColumns
               .asScala
-              .filterNot(col => col.isInvisible || col.isSpatialColumn || 
col.isComplexColumn)
+              .filterNot(col => col.isInvisible || col.isComplexColumn)
             val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
               colSchema,
               scanResultRdd.get,
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index b30f104..38f2f15 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -71,10 +71,10 @@ object CarbonSparkUtil {
     val fields = new Array[String](
       carbonRelation.dimensionsAttr.size + carbonRelation.measureAttr.size)
     val carbonTable = carbonRelation.carbonTable
-    val columnSchemas: mutable.Buffer[ColumnSchema] = 
carbonTable.getTableInfo.getFactTable.
-      getListOfColumns.asScala
-      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != 
-1 &&
-                         !cSchema.isSpatialColumn).sortWith(_.getSchemaOrdinal 
< _.getSchemaOrdinal)
+    val columnSchemas: mutable.Buffer[ColumnSchema] = carbonTable.getTableInfo
+      .getFactTable.getListOfColumns.asScala
+      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != 
-1)
+      .sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
     val columnList = columnSchemas.toList.asJava
     carbonRelation.dimensionsAttr.foreach(attr => {
       val carbonColumn = carbonTable.getColumnByName(attr.name)
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index e55f205..e998491 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -654,6 +654,30 @@ object CommonUtil {
   }
 
   /**
+   * This method will validate for spatial column
+   *
+   * @param properties
+   */
+  def validateForSpatialTypeColumn(properties: Map[String, String]): Unit = {
+    // Do not allow to set table properties on spatial column.
+    // Spatial column is only allowed in sort columns and spatial index 
property
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    if (spatialProperty.isDefined) {
+      val spatialColumn = spatialProperty.get.trim
+      properties.foreach { case (key, value) =>
+        if (!key.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
+            !key.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS) &&
+            value.contains(spatialColumn)) {
+          val errorMessage =
+            s"$spatialColumn is a spatial index column and is not allowed for 
" +
+            s"the option(s): $key"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      }
+    }
+  }
+
+  /**
    * This method will validate the cache level
    *
    * @param cacheLevel
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 0c67aeb..55a6e9d 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -217,6 +217,7 @@ object CarbonParserUtil {
 
     // do not allow below key words as column name
     validateColumnNames(allFields)
+    CommonUtil.validateForSpatialTypeColumn(tableProperties)
 
     fields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e551301..1ecb004 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -648,7 +648,6 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
-    columnSchema.setSpatialColumn(field.spatialIndex)
     if (isVarcharColumn(colName)) {
       columnSchema.setDataType(DataTypes.VARCHAR)
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
index 16df7bb..8ecf107 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
@@ -285,7 +285,15 @@ case class CarbonCreateIndexCommand(
       indexProvider: String): java.util.List[CarbonColumn] = {
     val indexCarbonColumns = parentTable.getIndexedColumns(indexColumns)
     val unique: util.Set[String] = new util.HashSet[String]
+    val properties = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     for (indexColumn <- indexCarbonColumns.asScala) {
+      if (spatialProperty.isDefined &&
+          indexColumn.getColName.equalsIgnoreCase(spatialProperty.get.trim)) {
+        throw new MalformedIndexCommandException(String.format(
+          "Spatial Index column is not supported, column '%s' is spatial 
column",
+          indexColumn.getColName))
+      }
       if 
(indexProvider.equalsIgnoreCase(IndexType.LUCENE.getIndexProviderName)) {
         // validate whether it is string column.
         if (indexColumn.getDataType != DataTypes.STRING) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 01afa5d..a87042a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -35,6 +35,7 @@ import org.apache.spark.util.CausedBy
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
@@ -471,6 +472,8 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     var reArrangedIndex: Seq[Int] = Seq()
     var selectedColumnSchema: Seq[ColumnSchema] = Seq()
     var partitionIndex: Seq[Int] = Seq()
+    val properties = tableInfo.getFactTable.getTableProperties.asScala
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     // internal order ColumnSchema (non-flat structure)
     val columnSchema = (table.getVisibleDimensions.asScala ++
                         
table.getVisibleMeasures.asScala).map(_.getColumnSchema)
@@ -494,20 +497,20 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     }
     columnSchema.foreach {
       col =>
-        if (col.isSpatialColumn) {
+        if (spatialProperty.isDefined &&
+            col.getColumnName.equalsIgnoreCase(spatialProperty.get.trim)) {
           carbonLoadModel.setNonSchemaColumnsPresent(true)
+        }
+        var skipPartitionColumn = false
+        if (partitionColumnNames != null &&
+            partitionColumnNames.contains(col.getColumnName)) {
+          partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+          skipPartitionColumn = true
         } else {
-          var skipPartitionColumn = false
-          if (partitionColumnNames != null &&
-              partitionColumnNames.contains(col.getColumnName)) {
-            partitionIndex = partitionIndex :+ 
createOrderMap(col.getColumnName)
-            skipPartitionColumn = true
-          } else {
-            reArrangedIndex = reArrangedIndex :+ 
createOrderMap(col.getColumnName)
-          }
-          if (!skipPartitionColumn) {
-            selectedColumnSchema = selectedColumnSchema :+ col
-          }
+          reArrangedIndex = reArrangedIndex :+ 
createOrderMap(col.getColumnName)
+        }
+        if (!skipPartitionColumn) {
+          selectedColumnSchema = selectedColumnSchema :+ col
         }
     }
     if (partitionColumnSchema != null) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 15a0823..15428fa 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -864,7 +864,7 @@ object CommonLoadUtils {
       // input data from csv files. Convert to logical plan
       val allCols = new ArrayBuffer[String]()
       // get only the visible dimensions from table
-      allCols ++= 
table.getVisibleDimensions.asScala.filterNot(_.isSpatialColumn).map(_.getColName)
+      allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
       allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
       StructType(
         
allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 7d3225d..16d90ef 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -27,6 +27,8 @@ import 
org.apache.spark.sql.execution.strategy.MixedFormatHandler
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.AlterTableUtil
+import scala.collection.JavaConverters._
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -70,6 +72,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     setAuditTable(carbonTable)
     setAuditInfo(Map("plan" -> plan.simpleString))
+    // Do not allow spatial index and its source columns to be updated.
+    AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, 
columns)
     columns.foreach { col =>
       val dataType = 
carbonTable.getColumnByName(col).getColumnSchema.getDataType
       if (dataType.isComplexType) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 75c21ae..8b79b70 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -62,9 +62,6 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         throw new MalformedCarbonCommandException(
           "alter table add column is not supported for index indexSchema")
       }
-      val alterColumns =
-        (alterTableAddColumnsModel.dimCols ++ 
alterTableAddColumnsModel.msrCols).map(_.column)
-      AlterTableUtil.validateSpatialIndexColumn(carbonTable, alterColumns)
       val operationContext = new OperationContext
       val alterTableAddColumnListener = 
AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index ae80a12..1c27dd7 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -127,8 +127,8 @@ private[sql] case class 
CarbonAlterTableColRenameDataTypeChangeCommand(
         throw new MalformedCarbonCommandException(
           "alter table column rename is not supported for index indexSchema")
       }
-      // Do not allow spatial index source columns to be changed.
-      AlterTableUtil.validateSpatialIndexSources(carbonTable,
+      // Do not allow spatial index column and its source columns to be 
changed.
+      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
         List(alterTableColRenameAndDataTypeChangeModel.columnName))
       val operationContext = new OperationContext
       operationContext.setProperty("childTableColumnRename", 
childTableColumnRename)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 5c03871..2289aad 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -63,7 +63,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           "alter table drop column is not supported for index indexSchema")
       }
       // Do not allow spatial index source columns to be dropped.
-      AlterTableUtil.validateSpatialIndexSources(carbonTable, 
alterTableDropColumnModel.columns)
+      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
+        alterTableDropColumnModel.columns)
       val partitionInfo = carbonTable.getPartitionInfo()
       val tableColumns = carbonTable.getCreateOrderColumn().asScala
       if (partitionInfo != null) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 5be4ef7..9de0833 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -172,10 +172,25 @@ case class CarbonCreateMVCommand(
     val viewSchema = getOutputSchema(logicalPlan)
     val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
+    val inputCols = logicalPlan.output.map(x =>
+      x.name
+    ).toList
     val relatedTableNames = new util.ArrayList[String](relatedTableList.size())
     // Check if load is in progress in any of the parent table mapped to the 
indexSchema
     relatedTableList.asScala.foreach {
       table =>
+        val tableProperties = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        // validate for spatial index column
+        val spatialProperty = 
tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        if (spatialProperty.isDefined) {
+          val spatialColumn = spatialProperty.get.trim
+          if (inputCols.contains(spatialColumn)) {
+            val errorMessage =
+              s"$spatialColumn is a spatial index column and is not allowed 
for " +
+              s"the option(s): MATERIALIZED VIEW"
+            throw new MalformedCarbonCommandException(errorMessage)
+          }
+        }
         if (!table.getTableInfo.isTransactionalTable) {
           throw new MalformedCarbonCommandException(
             "Cannot create mv on non-transactional table")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0b898eb..3caabb1 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,11 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
@@ -228,8 +230,17 @@ class CarbonFileMetastore extends CarbonMetaStore {
             
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation")
 ||
             c.getClass.getName.equals(
               
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
-        val catalogTable =
+        var catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", 
c).asInstanceOf[CatalogTable]
+        // Here, catalogTable will have spatial column in schema which is used 
to build carbon
+        // table. As spatial column is not supposed to be present in 
user-defined columns,
+        // removing it here. Later from tableproperties the column will be 
added in carbonTable.
+        val spatialProperty = 
catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        if (spatialProperty.isDefined) {
+          val originalSchema = StructType(catalogTable.schema.
+            filterNot(_.name.equalsIgnoreCase(spatialProperty.get.trim)))
+          catalogTable = catalogTable.copy(schema = originalSchema)
+        }
         val tableInfo = 
CarbonSparkSqlParserUtil.buildTableInfoFromCatalogTable(
           catalogTable, false, sparkSession)
         val carbonTable = CarbonTable.buildFromTableInfo(tableInfo)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 508e910..264d06b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -49,11 +49,7 @@ case class CarbonRelation(
   }
 
   val dimensionsAttr: Seq[AttributeReference] = {
-    val sett = new LinkedHashSet(carbonTable
-      .getVisibleDimensions
-      .asScala
-      .filterNot(_.isSpatialColumn)
-      .asJava)
+    val sett = new 
LinkedHashSet(carbonTable.getVisibleDimensions.asScala.asJava)
     sett.asScala.toSeq.map(dim => {
       val dimension = carbonTable.getDimensionByName(dim.getColName)
       val output: DataType = dimension.getDataType.getName.toLowerCase match {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index a03345f..3b8a19b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -210,6 +210,14 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
             .get
         }")
       }
+      val properties = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+      val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+      if (spatialProperty.isDefined) {
+        if (indexModel.columnNames.exists(x => 
x.equalsIgnoreCase(spatialProperty.get.trim))) {
+          throw new ErrorMessage(s"Secondary Index is not supported for 
Spatial index column:" +
+                                 s" ${ spatialProperty.get.trim }")
+        }
+      }
       if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
         throw new ErrorMessage(
           s"one or more specified index cols either does not exist or not a 
key column or complex" +
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8d12a7c..2d2e711 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -422,8 +422,6 @@ object AlterTableUtil {
       properties.foreach { entry =>
         lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2)
       }
-      // validate the required cache level properties
-      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
       // get the latest carbon table
       // read the latest schema file
       val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)
@@ -438,6 +436,12 @@ object AlterTableUtil {
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
 
+      // validate for spatial index column
+      CommonUtil.validateForSpatialTypeColumn(tblPropertiesMap ++ 
lowerCasePropertiesMap)
+
+      // validate the required cache level properties
+      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
+
       // validate the local dictionary properties
       validateLocalDictionaryProperties(lowerCasePropertiesMap, 
tblPropertiesMap, carbonTable)
 
@@ -625,7 +629,7 @@ object AlterTableUtil {
     if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       val schemaList: util.List[ColumnSchema] = CarbonUtil
         .getColumnSchemaList(carbonTable.getVisibleDimensions.asScala
-          .filterNot(_.getColumnSchema.isSpatialColumn).asJava, 
carbonTable.getVisibleMeasures)
+          .asJava, carbonTable.getVisibleMeasures)
       val tableColumns: Seq[String] = schemaList.asScala
         .map(columnSchema => columnSchema.getColumnName)
       CommonUtil
@@ -1060,21 +1064,9 @@ object AlterTableUtil {
     }
   }
 
-  def validateSpatialIndexColumn(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
-    // Do not allow columns to be added with spatial index column name
-    val properties = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
-    val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
-    if (indexProperty.isDefined) {
-      indexProperty.get.split(",").map(_.trim).foreach(element =>
-        if (alterColumns.contains(element)) {
-          throw new MalformedCarbonCommandException(s"Column: $element is not 
allowed. " +
-            s"This column is present in ${CarbonCommonConstants.SPATIAL_INDEX} 
table property.")
-        })
-      }
-  }
-
-  def validateSpatialIndexSources(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
-    // Do not allow spatial index source columns to be altered
+  def validateColumnsWithSpatialIndexProperties(carbonTable: CarbonTable, 
alterColumns: Seq[String])
+  : Unit = {
+    // Do not allow spatial index column and its source columns to be altered
     val properties = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
     val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProperty.isDefined) {
@@ -1082,9 +1074,9 @@ object AlterTableUtil {
         val srcColumns
         = properties.get(CarbonCommonConstants.SPATIAL_INDEX + 
s".$element.sourcecolumns")
         val common = 
alterColumns.intersect(srcColumns.get.split(",").map(_.trim))
-        if (common.nonEmpty) {
+        if (common.nonEmpty || alterColumns.contains(element)) {
           throw new MalformedCarbonCommandException(s"Columns present in " +
-            s"${CarbonCommonConstants.SPATIAL_INDEX} table property cannot be 
altered.")
+            s"${CarbonCommonConstants.SPATIAL_INDEX} table property cannot be 
altered/updated")
         }
       }
     }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala 
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index baf2f61..7235527 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -3,8 +3,7 @@ package org.apache.carbondata.geo
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-
-import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedIndexCommandException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach 
{
@@ -75,6 +74,84 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
       s"table."))
   }
 
+  test("test geo table with invalid table properties") {
+    var exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'RANGE_COLUMN'='timevalue', 'COLUMN_META_CACHE' = 
'mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): column_meta_cache"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'NO_INVERTED_INDEX'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): no_inverted_index"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1,
+        " 'SORT_COLUMNS'='mygeohash, timevalue ', 
'INVERTED_INDEX'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): inverted_index"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'RANGE_COLUMN'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): range_column"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'BUCKET_NUMBER'='10', 
'BUCKET_COLUMNS'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): bucket_columns"))
+  }
+
+  test("test alter table with invalid table properties") {
+    createTable()
+    var exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('SORT_COLUMNS'='mygeohash, 
timevalue ', " +
+          s"'INVERTED_INDEX'='mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): inverted_index"))
+
+    exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET 
TBLPROPERTIES('NO_INVERTED_INDEX'='mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): no_inverted_index"))
+
+    exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('COLUMN_META_CACHE' = 
'mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): column_meta_cache"))
+  }
+
+  test("test materialized view with spatial column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](sql(
+      s"CREATE MATERIALIZED VIEW view1 AS SELECT longitude, mygeohash FROM 
$table1"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): MATERIALIZED VIEW"))
+  }
+
+  test("test geo table create index on spatial column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedIndexCommandException](sql(
+      s"""
+         | CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
+         | AS 'bloomfilter'
+         | PROPERTIES('BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')
+      """.stripMargin))
+    assert(exception.getMessage.contains(
+      s"Spatial Index column is not supported, column 'mygeohash' is spatial 
column"))
+  }
+
   test("test geo table create and load and check describe formatted") {
     createTable()
     loadData()
@@ -90,6 +167,33 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
     }
   }
 
+  test("test geo table drop spatial index column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(s"alter table $table1 drop columns(mygeohash)"))
+    assert(exception.getMessage.contains(
+      s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
+      s"table property cannot be altered/updated"))
+  }
+
+  test("test geo table alter spatial index column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 
"))
+    assert(exception.getMessage.contains(
+      s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
+      s"table property cannot be altered/updated"))
+  }
+
+  test("test geo table filter by geo spatial index column") {
+    createTable()
+    loadData()
+    checkAnswer(sql(s"select *from $table1 where mygeohash = '2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
   test("test polygon query") {
     createTable()
     loadData()
@@ -112,6 +216,51 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
       result)
   }
 
+  test("insert into table select from another geo table with different 
properties") {
+    val sourceTable = table1;
+    val targetTable = table2;
+    sql(
+      s"""
+         | CREATE TABLE $sourceTable(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG) COMMENT "This is a GeoTable"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('SPATIAL_INDEX'='spatial',
+         | 'SPATIAL_INDEX.spatial.type'='geohash',
+         | 'SPATIAL_INDEX.spatial.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.spatial.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.spatial.gridSize'='60',
+         | 'SPATIAL_INDEX.spatial.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.spatial.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.spatial.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.spatial.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.spatial.conversionRatio'='1000000')
+       """.stripMargin)
+    loadData(sourceTable)
+    createTable(targetTable)
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(sql(s"select *from $targetTable where mygeohash = '2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
+  test("test insert into non-geo table select from geo table") {
+    val sourceTable = table1;
+    val targetTable = table2;
+    createTable(sourceTable)
+    loadData(sourceTable)
+    sql(
+      s"""
+          CREATE TABLE IF NOT EXISTS $targetTable
+          (spatial Long, time Bigint, longitude Long, latitude Long)
+          STORED AS carbondata
+        """)
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(
+      sql(s"select * from $targetTable where spatial='2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
   test("test insert into table select from another table with target table 
sort scope as global") {
     val sourceTable = table1;
     val targetTable = table2;
@@ -127,20 +276,49 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
 
   test("test block pruning for polygon query") {
     createTable()
-    sql(s"insert into $table1 select 1575428400000,116285807,40084087")
-    sql(s"insert into $table1 select 1575428400000,116372142,40129503")
-    sql(s"insert into $table1 select 1575428400000,116187332,39979316")
-    sql(s"insert into $table1 select 1575428400000,116337069,39951887")
-    sql(s"insert into $table1 select 1575428400000,116359102,40154684")
-    sql(s"insert into $table1 select 1575428400000,116736367,39970323")
-    sql(s"insert into $table1 select 1575428400000,116362699,39942444")
-    sql(s"insert into $table1 select 1575428400000,116325378,39963129")
-    sql(s"insert into $table1 select 1575428400000,116302895,39930753")
-    sql(s"insert into $table1 select 1575428400000,116288955,39999101")
-    val df = sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 " +
+    sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 0,1575428400000,116372142,40129503")
+    sql(s"insert into $table1 select 0,1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 0,1575428400000,116337069,39951887")
+    sql(s"insert into $table1 select 0,1575428400000,116359102,40154684")
+    sql(s"insert into $table1 select 0,1575428400000,116736367,39970323")
+    sql(s"insert into $table1 select 0,1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 0,1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 0,1575428400000,116302895,39930753")
+    sql(s"insert into $table1 select 0,1575428400000,116288955,39999101")
+    val df = sql(s"select * from $table1 where IN_POLYGON('116.321011 " +
                  s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 
116.321011 40.123503')")
     assert(df.rdd.getNumPartitions == 6)
-    checkAnswer(df, result)
+    checkAnswer(df, Seq(Row(733215, 1575428400000L, 116187332, 39979316),
+      Row(2160019, 1575428400000L, 116362699, 39942444),
+      Row(2170151, 1575428400000L, 116288955, 39999101),
+      Row(2174509, 1575428400000L, 116325378, 39963129),
+      Row(2196036, 1575428400000L, 116337069, 39951887),
+      Row(2361256, 1575428400000L, 116285807, 40084087)))
+  }
+
+  test("test insert into on table partitioned by timevalue column") {
+    sql(
+      s"""
+         | CREATE TABLE $table1(
+         | longitude LONG,
+         | latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY 
(timevalue BIGINT)
+         | STORED AS carbondata
+         | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    sql(s"insert into $table1 select 0, 116337069, 39951887, 1575428400000")
+    checkAnswer(
+      sql(s"select * from $table1 where mygeohash = '2196036'"),
+      Seq(Row(2196036, 116337069, 39951887, 1575428400000L)))
   }
 
   test("test polygon query on table partitioned by timevalue column") {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 5a82e7a..4ad4cce 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -18,7 +18,9 @@
 package org.apache.carbondata.processing.loading.converter.impl;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -70,9 +72,17 @@ public class FieldEncoderFactory {
   public FieldConverter createFieldEncoder(
       DataField dataField, int index, String nullFormat, boolean 
isEmptyBadRecord,
       boolean isConvertToBinary, String binaryDecoder, 
CarbonDataLoadConfiguration configuration) {
+    String spatialProperty = null;
+    if (configuration != null) {
+      Map<String, String> properties =
+          
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
+    }
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
-      if (dataField.getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && dataField.getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         return new SpatialIndexFieldConverterImpl(dataField, nullFormat, 
index, isEmptyBadRecord,
             configuration);
       } else if (dataField.getColumn().getDataType() == DataTypes.DATE &&
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 2b6657c..c89b932 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -19,8 +19,10 @@ package 
org.apache.carbondata.processing.loading.converter.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -83,14 +85,18 @@ public class RowConverterImpl implements RowConverter {
     List<FieldConverter> fieldConverterList = new ArrayList<>();
     List<FieldConverter> nonSchemaFieldConverterList = new ArrayList<>();
     long lruCacheStartTime = System.currentTimeMillis();
-
+    Map<String, String> properties =
+        
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, 
isConvertToBinary,
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
@@ -107,9 +113,17 @@ public class RowConverterImpl implements RowConverter {
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
     logHolder.setLogged(false);
     logHolder.clear();
+    Map<String, String> properties =
+        
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
+    boolean isSpatialColumn = false;
     for (int i = 0; i < fieldConverters.length; i++) {
-      if (configuration.isNonSchemaColumnsPresent() && 
!fieldConverters[i].getDataField()
-          .getColumn().isSpatialColumn()) {
+      if (spatialProperty != null) {
+        isSpatialColumn = 
fieldConverters[i].getDataField().getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim());
+      }
+      if (configuration.isNonSchemaColumnsPresent() && !isSpatialColumn) {
         // Skip the conversion for schema columns if the conversion is 
required only for non-schema
         // columns
         continue;
@@ -156,13 +170,18 @@ public class RowConverterImpl implements RowConverter {
     boolean isEmptyBadRecord = Boolean.parseBoolean(
         
configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
             .toString());
+    Map<String, String> properties =
+        
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, 
isConvertToBinary,
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 45d62c9..23caf31 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -18,7 +18,9 @@
 package org.apache.carbondata.processing.loading.parser.impl;
 
 import java.util.ArrayList;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
@@ -72,10 +74,15 @@ public class RowParserImpl implements RowParser {
     numberOfColumns = header.length;
     DataField[] input = new DataField[fields.length];
     inputMapping = new int[input.length];
+    Map<String, String> properties =
+        
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     int k = 0;
     for (int i = 0; i < fields.length; i++) {
-      if (fields[i].getColumn().isSpatialColumn()) {
-        // Index columns are non-schema fields. They are not present in the 
header. So set
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
+        // Spatial index columns are not present in the header. So set
         // the input mapping as -1 for the field and continue
         input[k] = fields[i];
         inputMapping[k] = -1;
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 21a4bcc..c8fa656 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -366,8 +366,13 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] 
dataFields) {
       Object[] newData = new Object[dataFields.length];
+      Map<String, String> properties =
+          
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
       for (int i = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isSpatialColumn()) {
+        if (spatialProperty != null && dataFields[i].getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim())) {
           continue;
         }
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
@@ -410,27 +415,31 @@ public class InputProcessorStepWithNoConverterImpl 
extends AbstractDataLoadProce
     private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] 
data,
         DataField[] dataFields) {
       Object[] newData = new Object[dataFields.length];
+      Map<String, String> properties =
+          
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
       // now dictionary is removed, no need of no dictionary mapping
-      for (int i = 0, index = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isSpatialColumn()) {
+      for (int i = 0; i < dataFields.length; i++) {
+        if (spatialProperty != null && dataFields[i].getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim())) {
           continue;
         }
         if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
           // keep the no dictionary measure column as original data
-          newData[i] = data[index];
+          newData[i] = data[i];
         } else if (dataTypes[i].isComplexType()) {
-          getComplexTypeByteArray(newData, i, data, dataFields[i], index, 
true);
-        } else if (dataTypes[i] == DataTypes.DATE && data[index] instanceof 
Long) {
+          getComplexTypeByteArray(newData, i, data, dataFields[i], i, true);
+        } else if (dataTypes[i] == DataTypes.DATE && data[i] instanceof Long) {
           if (dateDictionaryGenerator == null) {
             dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
                 .getDirectDictionaryGenerator(dataTypes[i], 
dataFields[i].getDateFormat());
           }
-          newData[i] = dateDictionaryGenerator.generateKey((long) data[index]);
+          newData[i] = dateDictionaryGenerator.generateKey((long) data[i]);
         } else {
           newData[i] =
-              
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[index], 
dataTypes[i]);
+              DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[i], 
dataTypes[i]);
         }
-        index++;
       }
       return newData;
     }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index d8a22a7..474a54e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -333,8 +333,12 @@ public final class CarbonDataProcessorUtil {
     Set<String> columnNames = new 
HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonDimension> dimensions =
         schema.getCarbonTable().getVisibleDimensions();
+    Map<String, String> properties =
+        
schema.getCarbonTable().getTableInfo().getFactTable().getTableProperties();
+    String spatialProperty = 
properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (CarbonDimension dimension : dimensions) {
-      if (!dimension.isSpatialColumn()) {
+      if (spatialProperty != null && !dimension.getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         // skip the non-schema column
         columnNames.add(dimension.getColName());
       }

Reply via email to