This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccaf47  [CARBONDATA-3801][CARBONDATA-3805][CARBONDATA-3809] Query on 
partition table with SI having multiple partition columns gives empty results
5ccaf47 is described below

commit 5ccaf4703b99a9903723df57d4629fe6e135e25c
Author: Indhumathi27 <[email protected]>
AuthorDate: Wed May 6 16:15:38 2020 +0530

    [CARBONDATA-3801][CARBONDATA-3805][CARBONDATA-3809] Query on partition 
table with SI having multiple partition
    columns gives empty results
    
    Why is this PR needed?
    1. Query on partition table with SI having multiple partition columns gives 
empty results. Because while loading data to SI table,
    blockid -> partition directory path is replaced by '#' instead of '/' to 
support multi level partitioning. Hence block id will be
    like part1=1#part2=2/xxxxxxxxx. During query the above block id is compared 
with actual block id part1=1/part2=2/xxxxxxxxx,
    which do not match, and provides empty results.
    2. Drop Index throws exception while starting beeline, that error while 
editing schema file
    3. Refresh syntax in SI doc is wrong.
    
    What changes were proposed in this PR?
    1. During query, convert block id as in Load flow in case of partition 
table, to support multi level partitioning.
    2. Remove modifying schema mdt file in index flow.
    3. Updated Refresh syntax
    
    This closes #3748
---
 .../core/indexstore/blockletindex/BlockIndex.java  | 12 +++++--
 docs/index/index-management.md                     |  6 ++++
 docs/index/secondary-index-guide.md                |  4 +--
 .../secondaryindex/TestSIWithPartition.scala       | 23 ++++++++++++
 .../sql/hive/CarbonHiveIndexMetadataUtil.scala     |  4 ---
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  4 +--
 .../sql/secondaryindex/util/FileInternalUtil.scala | 41 ----------------------
 7 files changed, 42 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 170d383..3ab3cd4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -63,6 +63,7 @@ import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecu
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.BlockletIndexUtil;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -793,9 +794,14 @@ public class BlockIndex extends CoarseGrainIndex
     BitSet bitSet = null;
     if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
       String uniqueBlockPath;
-      if (segmentPropertiesWrapper.getCarbonTable().isHivePartitionTable()) {
-        uniqueBlockPath = filePath
-            
.substring(segmentPropertiesWrapper.getCarbonTable().getTablePath().length() + 
1);
+      CarbonTable carbonTable = segmentPropertiesWrapper.getCarbonTable();
+      if (carbonTable.isHivePartitionTable()) {
+        // While data loading to SI created on Partition table, on partition 
directory, '/' will be
+        // replaced with '#', to support multi level partitioning. For 
example, BlockId will be
+        // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId 
should be
+        // replaced by '#' in place of '/', to match and prune data on SI 
table.
+        uniqueBlockPath = CarbonUtil
+            .getBlockId(carbonTable.getAbsoluteTableIdentifier(), filePath, 
"", true, false, true);
       } else {
         uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 
1);
       }
diff --git a/docs/index/index-management.md b/docs/index/index-management.md
index 7bd9c75..b56f389 100644
--- a/docs/index/index-management.md
+++ b/docs/index/index-management.md
@@ -39,6 +39,12 @@ AS carbondata/bloomfilter/lucene
 [PROPERTIES ('key'='value')]
 ```
 
+Index can be refreshed using following DDL
+
+```
+REFRESH INDEX index_name ON [TABLE] [db_name.]table_name
+```
+
 Currently, there are 3 Index implementations in CarbonData.
 
 | Index Provider   | Description                                               
                       | Management |
diff --git a/docs/index/secondary-index-guide.md 
b/docs/index/secondary-index-guide.md
index 1d86b82..cf22097 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -142,12 +142,12 @@ Where there are so many small files present in the SI 
table, then we can use the
 compact the files within an SI segment to avoid many small files.
 
   ```
-  REFRESH INDEX sales_index
+  REFRESH INDEX sales_index ON TABLE sales
   ```
 This command merges data files in each segment of the SI table.
 
   ```
-  REFRESH INDEX sales_index WHERE SEGMENT.ID IN(1)
+  REFRESH INDEX sales_index ON TABLE sales WHERE SEGMENT.ID IN(1)
   ```
 This command merges data files within a specified segment of the SI table.
 
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index 3581211..31bd4a2 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -356,8 +356,31 @@ class TestSIWithPartition extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
+  test("test secondary index with partition table having mutiple partition 
columns") {
+    sql("drop table if exists partition_table")
+    sql(s"""
+         | CREATE TABLE partition_table (stringField string, intField int, 
shortField short, stringField1 string)
+         | STORED AS carbondata
+         | PARTITIONED BY (hour_ string, date_ string, sec_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 
'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"drop index if exists si_on_multi_part on partition_table")
+    sql(s"create index si_on_multi_part on partition_table(stringField1) as 
'carbondata'")
+    sql("insert into partition_table select 'abc', 1,123,'abc1',2,'mon','ten'")
+    checkAnswer(sql(s"select count(*) from si_on_multi_part"), Seq(Row(1)))
+    val dataFrame = sql(s"select stringField,date_,sec_ from partition_table 
where stringField1='abc1'")
+    checkAnswer(dataFrame, Seq(Row("abc","mon","ten")))
+    if (!isFilterPushedDownToSI(dataFrame.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    sql("drop table if exists partition_table")
+  }
+
   override protected def afterAll(): Unit = {
     sql("drop index if exists indextable1 on uniqdata1")
     sql("drop table if exists uniqdata1")
+    sql("drop table if exists partition_table")
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
index 57fb05e..a7871fb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.util.FileInternalUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -106,9 +105,6 @@ object CarbonHiveIndexMetadataUtil {
     sparkSession.sql(
       s"""ALTER TABLE $dbName.$parentTableName SET SERDEPROPERTIES 
('indexInfo'='$newIndexInfo')
         """.stripMargin).collect()
-    FileInternalUtil.touchSchemaFileTimestamp(dbName, parentTableName,
-      parentCarbonTable.getTablePath, System.currentTimeMillis())
-    FileInternalUtil.touchStoreTimeStamp()
     refreshTable(dbName, parentTableName, sparkSession)
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index f346ea1..6213caf 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -99,7 +99,7 @@ object CarbonIndexUtil {
   def getCGAndFGIndexes(carbonTable: CarbonTable): java.util.Map[String,
     util.Map[String, util.Map[String, String]]] = {
     val indexMetadata = carbonTable.getIndexMetadata
-    val cgAndFgIndexes = if (null != indexMetadata) {
+    val cgAndFgIndexes = if (null != indexMetadata && null != 
indexMetadata.getIndexesMap) {
       val indexesMap = indexMetadata.getIndexesMap
       indexesMap.asScala.filter(provider =>
         
!provider._1.equalsIgnoreCase(IndexType.SI.getIndexProviderName)).asJava
@@ -236,7 +236,7 @@ object CarbonIndexUtil {
   def getIndexCarbonTables(carbonTable: CarbonTable,
       sparkSession: SparkSession): Seq[CarbonTable] = {
     val indexMetadata = carbonTable.getIndexMetadata
-    val siIndexesMap = if (null != indexMetadata) {
+    val siIndexesMap = if (null != indexMetadata && null != 
indexMetadata.getIndexesMap) {
       indexMetadata.getIndexesMap.get(IndexType.SI.getIndexProviderName)
     } else {
       new util.HashMap[String, util.Map[String, util.Map[String, String]]]()
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
index 1be3b3d..46c318e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
@@ -23,14 +23,10 @@ import 
org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.index.CarbonIndexUtil
 
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -39,30 +35,6 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
  */
 object FileInternalUtil {
 
-  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @return
-   */
-  def touchStoreTimeStamp(): Long = {
-    val timestampFile = getTimestampFileAndType()
-    val systemTime = System.currentTimeMillis()
-    FileFactory.getCarbonFile(timestampFile)
-      .setLastModifiedTime(systemTime)
-    systemTime
-  }
-
-  private def getTimestampFileAndType() = {
-    // if mdt file path is configured then take configured path else take 
default path
-    val configuredMdtPath = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
-        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
-    val timestampFile = configuredMdtPath + "/" + 
CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    CarbonUtil.checkAndAppendFileSystemURIScheme(timestampFile)
-  }
-
   def updateTableStatus(
     validSegments: List[String],
     databaseName: String,
@@ -114,17 +86,4 @@ object FileInternalUtil {
     )
     status
   }
-
-
-
-  def touchSchemaFileTimestamp(dbName: String,
-      tableName: String,
-      tablePath: String,
-      schemaTimeStamp: Long): Unit = {
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    if (FileFactory.isFileExist(tableMetadataFile)) {
-      FileFactory.getCarbonFile(tableMetadataFile)
-        .setLastModifiedTime(schemaTimeStamp)
-    }
-  }
 }

Reply via email to