This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b00efca  [CARBONDATA-4149] Fix query issues after alter add partition.
b00efca is described below

commit b00efca47689dbf2d3b55bdd8ea60944c7f118c0
Author: ShreelekhyaG <[email protected]>
AuthorDate: Sun Mar 14 00:15:43 2021 +0530

    [CARBONDATA-4149] Fix query issues after alter add partition.
    
    Why is this PR needed?
    Query with SI after add partition based on location on partition table 
gives incorrect results.
    1. While pruning, if it's an external segment, it should use 
ExternalSegmentResolver , and no
       need to use ImplicitIncludeFilterExecutor as an external segment is not 
added in the SI table.
    2. If the partition table has external partitions, after compaction the new 
files are loaded
       to the external path.
    3. Data is not loaded to the child table(MV) after executing add partition 
command
    
    What changes were proposed in this PR?
    1. add path to loadMetadataDetails for external partition. It is used to 
identify it as an
       external segment.
    2. After compaction, to not maintain any link to the external partition, 
the compacted files
       will be added as a new partition in the table. To update partition spec 
details in hive metastore,
       (drop partition + add partition) operations performed.
    3. Add Load Pre and Post listener's in 
CarbonAlterTableAddHivePartitionCommand to trigger data
       load to materialized view.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4107
---
 .../carbondata/core/indexstore/PartitionSpec.java  |  5 ++
 .../secondaryindex/TestSIWithPartition.scala       | 84 +++++++++++++++++++++-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 25 ++++++-
 .../spark/rdd/CarbonTableCompactor.scala           | 43 ++++++++++-
 .../CarbonAlterTableAddHivePartitionCommand.scala  | 24 +++++++
 .../view/rewrite/TestPartitionWithMV.scala         | 30 ++++++++
 .../processing/merger/CarbonDataMergerUtil.java    |  4 +-
 .../processing/util/CarbonDataProcessorUtil.java   | 10 +++
 8 files changed, 218 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
index 9111ce2..0eb4539 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
@@ -76,6 +76,11 @@ public class PartitionSpec implements Serializable, Writable 
{
     return locationPath;
   }
 
+  public void setLocation(String location) {
+    locationPath = new Path(location);
+    this.location = location;
+  }
+
   public String getUuid() {
     return uuid;
   }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index dbc758b..19bb6ce 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -16,10 +16,14 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.CarbonWriter
 import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
@@ -380,6 +384,84 @@ class TestSIWithPartition extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists partition_table")
   }
 
+  test("test si with add partition based on location on partition table") {
+    sql("drop table if exists partition_table")
+    sql("create table partition_table (id int,name String) " +
+        "partitioned by(email string) stored as carbondata")
+    sql("insert into partition_table select 1,'blue','abc'")
+    sql("CREATE INDEX partitionTable_si  on table partition_table (name) as 
'carbondata'")
+    val schemaFile =
+      CarbonTablePath.getSchemaFilePath(
+        CarbonEnv.getCarbonTable(None, 
"partition_table")(sqlContext.sparkSession).getTablePath)
+    val sdkWritePath = target + "/" + "def"
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+    val writer = CarbonWriter.builder()
+      .outputPath(sdkWritePath)
+      .writtenBy("test")
+      .withSchemaFile(schemaFile)
+      .withCsvInput()
+      .build()
+    writer.write(Seq("2", "red", "def").toArray)
+    writer.write(Seq("3", "black", "def").toArray)
+    writer.close()
+    sql(s"alter table partition_table add partition (email='def') location 
'$sdkWritePath'")
+    var extSegmentQuery = sql("select * from partition_table where name = 
'red'")
+    checkAnswer(extSegmentQuery, Row(2, "red", "def"))
+    sql("insert into partition_table select 4,'grey','bcd'")
+    sql("insert into partition_table select 5,'red','abc'")
+    sql("alter table partition_table compact 'minor'")
+    extSegmentQuery = sql("select * from partition_table where name = 'red'")
+    checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(5, "red", 
"abc")))
+    
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+    sql("drop table if exists partition_table")
+  }
+
+  test("test si with add multiple partitions based on location on partition 
table") {
+    sql("drop table if exists partition_table")
+    sql("create table partition_table (id int,name String) " +
+        "partitioned by(email string, age int) stored as carbondata")
+    sql("insert into partition_table select 1,'blue','abc', 20")
+    sql("CREATE INDEX partitionTable_si  on table partition_table (name) as 
'carbondata'")
+    val schemaFile =
+      CarbonTablePath.getSchemaFilePath(
+        CarbonEnv.getCarbonTable(None, 
"partition_table")(sqlContext.sparkSession).getTablePath)
+    val sdkWritePath1 = target + "/" + "def"
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath1))
+    var writer = CarbonWriter.builder()
+      .outputPath(sdkWritePath1)
+      .writtenBy("test")
+      .withSchemaFile(schemaFile)
+      .withCsvInput()
+      .build()
+    writer.write(Seq("2", "red", "def", "25").toArray)
+    writer.write(Seq("3", "black", "def", "25").toArray)
+    writer.close()
+    val sdkWritePath2 = target + "/" + "def2"
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath2))
+    writer = CarbonWriter.builder()
+      .outputPath(sdkWritePath2)
+      .writtenBy("test")
+      .withSchemaFile(schemaFile)
+      .withCsvInput()
+      .build()
+    writer.write(Seq("2", "red", "def2", "22").toArray)
+    writer.write(Seq("3", "black", "def2", "22").toArray)
+    writer.close()
+    sql(
+      s"alter table partition_table add partition (email='def', age='25') 
location " +
+      s"'$sdkWritePath1' partition (email='def2', age ='22') location 
'$sdkWritePath2'")
+    var extSegmentQuery = sql("select * from partition_table where name = 
'red'")
+      checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25), Row(2, "red", 
"def2", 22)))
+    sql("insert into partition_table select 4,'grey','bcd',23")
+    sql("insert into partition_table select 5,'red','abc',22")
+    sql("alter table partition_table compact 'minor'")
+    extSegmentQuery = sql("select * from partition_table where name = 'red'")
+    checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25),
+      Row(2, "red", "def2", 22), Row(5, "red", "abc", 22)))
+    
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+    sql("drop table if exists partition_table")
+  }
+
   override protected def afterAll(): Unit = {
     sql("drop index if exists indextable1 on uniqdata1")
     sql("drop table if exists uniqdata1")
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 061ce90..7e6d5f3 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.util
 import java.util.{Collections, List}
 import java.util.concurrent.atomic.AtomicInteger
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.reflect.classTag
+import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.hadoop.mapreduce.{InputSplit, Job}
 import org.apache.spark._
@@ -98,6 +99,27 @@ class CarbonMergerRDD[K, V](
     broadCastSplits = sparkContext.broadcast(new 
CarbonInputSplitWrapper(splits))
   }
 
+  // checks for added partition specs with external path.
+  // after compaction, location path to be updated with table path.
+  def checkAndUpdatePartitionLocation(partitionSpec: PartitionSpec) : 
PartitionSpec = {
+    breakable {
+      if (partitionSpec != null) {
+        carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail 
=> {
+          if (loadMetaDetail.getPath != null &&
+              
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString)) 
{
+            val updatedPartitionLocation = CarbonDataProcessorUtil
+              .createCarbonStoreLocationForPartition(
+                carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+                partitionSpec.getPartitions.toArray.mkString(File.separator))
+            partitionSpec.setLocation(updatedPartitionLocation)
+            break()
+          }
+        })
+      }
+    }
+    partitionSpec
+  }
+
   override def internalCompute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     val queryStartTime = System.currentTimeMillis()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -210,6 +232,7 @@ class CarbonMergerRDD[K, V](
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
           carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
 
+       checkAndUpdatePartitionLocation(partitionSpec)
         if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
           rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() 
== 0) {
 
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 5db344b..68edd09 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -19,16 +19,19 @@ package org.apache.carbondata.spark.rdd
 
 import java.util
 import java.util.{Collections, List}
-import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
+import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, 
CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand, CarbonMergerMapping, CompactionCallableModel, 
CompactionModel}
 import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}
 
@@ -36,6 +39,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -247,6 +251,7 @@ class CarbonTableCompactor(
       .sparkContext
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
 
+    val updatePartitionSpecs : List[PartitionSpec] = new 
util.ArrayList[PartitionSpec]
     var mergeRDD: CarbonMergerRDD[String, Boolean] = null
     if (carbonTable.isHivePartitionTable) {
       // collect related partitions
@@ -263,6 +268,19 @@ class CarbonTableCompactor(
       if (partitionSpecs != null && partitionSpecs.nonEmpty) {
         compactionCallableModel.compactedPartitions = Some(partitionSpecs)
       }
+      partitionSpecs.foreach(partitionSpec => {
+        breakable {
+          
carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail => {
+            if (loadMetaDetail.getPath != null &&
+                
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString)) 
{
+              // if partition spec added is external path,
+              // after compaction location path to be updated with table path.
+              updatePartitionSpecs.add(partitionSpec)
+              break()
+            }
+          })
+        }
+      })
     }
 
     val mergeStatus =
@@ -276,7 +294,26 @@ class CarbonTableCompactor(
           segmentMetaDataAccumulator)
       } else {
         if (mergeRDD != null) {
-          mergeRDD.collect
+          val result = mergeRDD.collect
+          if (!updatePartitionSpecs.isEmpty) {
+            val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
+              Some(carbonTable.getDatabaseName))
+            val partitionSpecs = updatePartitionSpecs.asScala.map {
+              partitionSpec =>
+                // replaces old partitionSpec with updated partitionSpec
+                mergeRDD.checkAndUpdatePartitionLocation(partitionSpec)
+                PartitioningUtils.parsePathFragment(
+                  String.join(CarbonCommonConstants.FILE_SEPARATOR, 
partitionSpec.getPartitions))
+            }
+            // To update partitionSpec in hive metastore, drop and add with 
latest path.
+            AlterTableDropPartitionCommand(
+              tableIdentifier,
+              partitionSpecs,
+              true, false, true).run(sqlContext.sparkSession)
+            AlterTableAddPartitionCommand(tableIdentifier,
+              partitionSpecs.map(p => (p, None)), 
false).run(sqlContext.sparkSession)
+          }
+          result
         } else {
           new CarbonMergerRDD(
             sc.sparkSession,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 26ef3b7..7933a25 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -125,6 +126,19 @@ case class CarbonAlterTableAddHivePartitionCommand(
         loadModel.setColumnCompressor(columnCompressor)
         loadModel.setCarbonTransactionalTable(true)
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
+        // create operationContext to fire load events
+        val operationContext: OperationContext = new OperationContext
+        val (tableIndexes, indexOperationContext) = 
CommonLoadUtils.firePreLoadEvents(
+          sparkSession = sparkSession,
+          carbonLoadModel = loadModel,
+          uuid = "",
+          factPath = "",
+          null,
+          null,
+          isOverwriteTable = false,
+          isDataFrame = false,
+          updateModel = None,
+          operationContext = operationContext)
         // Create new entry in tablestatus file
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
         val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
@@ -133,6 +147,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
             loadModel.getSegmentId, 
String.valueOf(loadModel.getFactTimeStamp)) +
           CarbonTablePath.SEGMENT_EXT
         newMetaEntry.setSegmentFile(segmentFileName)
+        // set path to identify it as external added partition
+        newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
+          .map(_.getLocation.toString).mkString(","))
         val segmentsLoc = 
CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
         CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
         val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + 
segmentFileName
@@ -169,6 +186,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
           customSegmentIds = customSegmentIds)
         val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, 
alterTableModel)
         OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new 
OperationContext)
+        // fire event to load data to materialized views
+        CommonLoadUtils.firePostLoadEvents(sparkSession,
+          loadModel,
+          tableIndexes,
+          indexOperationContext,
+          table,
+          operationContext)
       }
     }
     Seq.empty[Row]
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 4c41b3a..229d6df 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -26,6 +26,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.CarbonWriter
 
 /**
  * Test class for MV to verify partition scenarios
@@ -770,5 +772,33 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll with BeforeAn
     assert(sql("select timeseries(b,'day'),c from partitionone group by 
timeseries(b,'day'),c").collect().length == 1)
     sql("drop table if exists partitionone")
   }
+
+  test("test mv with add partition based on location on partition table") {
+    sql("drop table if exists partition_table")
+    sql("create table partition_table (id int,name String) " +
+        "partitioned by(email string) stored as carbondata")
+    sql("drop materialized view if exists partitiontable_mv")
+    sql("CREATE materialized view partitiontable_mv as select name from 
partition_table")
+    sql("insert into partition_table select 1,'blue','abc'")
+    val schemaFile =
+      CarbonTablePath.getSchemaFilePath(
+        CarbonEnv.getCarbonTable(None, 
"partition_table")(sqlContext.sparkSession).getTablePath)
+    val sdkWritePath = target + "/" + "def"
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+    val writer = CarbonWriter.builder()
+      .outputPath(sdkWritePath)
+      .writtenBy("test")
+      .withSchemaFile(schemaFile)
+      .withCsvInput()
+      .build()
+    writer.write(Seq("2", "red", "def").toArray)
+    writer.write(Seq("3", "black", "def").toArray)
+    writer.close()
+    sql(s"alter table partition_table add partition (email='def') location 
'$sdkWritePath'")
+    val extSegmentQuery = sql("select name from partition_table")
+    assert(TestUtil.verifyMVHit(extSegmentQuery.queryExecution.optimizedPlan, 
"partitiontable_mv"))
+    checkAnswer(extSegmentQuery, Seq(Row("blue"), Row("red"), Row("black")))
+    sql("drop table if exists partition_table")
+  }
   // scalastyle:on lineLength
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 5576133..955469d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -824,9 +824,9 @@ public final class CarbonDataMergerUtil {
       //check if this load is an already merged load.
       if (null != segment.getMergedLoadName()) {
         segments
-            .add(Segment.getSegment(segment.getMergedLoadName(), 
segment.getSegmentFile(), null));
+            .add(new Segment(segment.getMergedLoadName(), 
segment.getSegmentFile(), null, segment));
       } else {
-        segments.add(Segment.getSegment(segment.getLoadName(), 
segment.getSegmentFile(), null));
+        segments.add(new Segment(segment.getLoadName(), 
segment.getSegmentFile(), null, segment));
       }
     }
     return segments;
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 3a68410..dcf70b0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -687,6 +687,16 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * This method will get the store location for the given path, partition spec
+   *
+   * @return data directory path
+   */
+  public static String createCarbonStoreLocationForPartition(CarbonTable 
carbonTable,
+      String partition) {
+    return carbonTable.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + 
partition;
+  }
+
+  /**
    * initialise data type for measures for their storage format
    */
   public static DataType[] initDataType(CarbonTable carbonTable, String 
tableName,

Reply via email to