This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b00efca [CARBONDATA-4149] Fix query issues after alter add partition.
b00efca is described below
commit b00efca47689dbf2d3b55bdd8ea60944c7f118c0
Author: ShreelekhyaG <[email protected]>
AuthorDate: Sun Mar 14 00:15:43 2021 +0530
[CARBONDATA-4149] Fix query issues after alter add partition.
Why is this PR needed?
Query with SI after add partition based on location on partition table
gives incorrect results.
1. While pruning, if it's an external segment, it should use
ExternalSegmentResolver , and no
need to use ImplicitIncludeFilterExecutor as an external segment is not
added in the SI table.
2. If the partition table has external partitions, after compaction the new
files are loaded
to the external path.
3. Data is not loaded to the child table(MV) after executing add partition
command
What changes were proposed in this PR?
1. add path to loadMetadataDetails for external partition. It is used to
identify it as an
external segment.
2. After compaction, to not maintain any link to the external partition,
the compacted files
will be added as a new partition in the table. To update partition spec
details in hive metastore,
(drop partition + add partition) operations performed.
3. Add Load Pre and Post listener's in
CarbonAlterTableAddHivePartitionCommand to trigger data
load to materialized view.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4107
---
.../carbondata/core/indexstore/PartitionSpec.java | 5 ++
.../secondaryindex/TestSIWithPartition.scala | 84 +++++++++++++++++++++-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 25 ++++++-
.../spark/rdd/CarbonTableCompactor.scala | 43 ++++++++++-
.../CarbonAlterTableAddHivePartitionCommand.scala | 24 +++++++
.../view/rewrite/TestPartitionWithMV.scala | 30 ++++++++
.../processing/merger/CarbonDataMergerUtil.java | 4 +-
.../processing/util/CarbonDataProcessorUtil.java | 10 +++
8 files changed, 218 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
index 9111ce2..0eb4539 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
@@ -76,6 +76,11 @@ public class PartitionSpec implements Serializable, Writable
{
return locationPath;
}
+ public void setLocation(String location) {
+ locationPath = new Path(location);
+ this.location = location;
+ }
+
public String getUuid() {
return uuid;
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index dbc758b..19bb6ce 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -16,10 +16,14 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.CarbonWriter
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
@@ -380,6 +384,84 @@ class TestSIWithPartition extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists partition_table")
}
+ test("test si with add partition based on location on partition table") {
+ sql("drop table if exists partition_table")
+ sql("create table partition_table (id int,name String) " +
+ "partitioned by(email string) stored as carbondata")
+ sql("insert into partition_table select 1,'blue','abc'")
+ sql("CREATE INDEX partitionTable_si on table partition_table (name) as
'carbondata'")
+ val schemaFile =
+ CarbonTablePath.getSchemaFilePath(
+ CarbonEnv.getCarbonTable(None,
"partition_table")(sqlContext.sparkSession).getTablePath)
+ val sdkWritePath = target + "/" + "def"
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+ val writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def").toArray)
+ writer.write(Seq("3", "black", "def").toArray)
+ writer.close()
+ sql(s"alter table partition_table add partition (email='def') location
'$sdkWritePath'")
+ var extSegmentQuery = sql("select * from partition_table where name =
'red'")
+ checkAnswer(extSegmentQuery, Row(2, "red", "def"))
+ sql("insert into partition_table select 4,'grey','bcd'")
+ sql("insert into partition_table select 5,'red','abc'")
+ sql("alter table partition_table compact 'minor'")
+ extSegmentQuery = sql("select * from partition_table where name = 'red'")
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(5, "red",
"abc")))
+
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+ sql("drop table if exists partition_table")
+ }
+
+ test("test si with add multiple partitions based on location on partition
table") {
+ sql("drop table if exists partition_table")
+ sql("create table partition_table (id int,name String) " +
+ "partitioned by(email string, age int) stored as carbondata")
+ sql("insert into partition_table select 1,'blue','abc', 20")
+ sql("CREATE INDEX partitionTable_si on table partition_table (name) as
'carbondata'")
+ val schemaFile =
+ CarbonTablePath.getSchemaFilePath(
+ CarbonEnv.getCarbonTable(None,
"partition_table")(sqlContext.sparkSession).getTablePath)
+ val sdkWritePath1 = target + "/" + "def"
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath1))
+ var writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath1)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def", "25").toArray)
+ writer.write(Seq("3", "black", "def", "25").toArray)
+ writer.close()
+ val sdkWritePath2 = target + "/" + "def2"
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath2))
+ writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath2)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def2", "22").toArray)
+ writer.write(Seq("3", "black", "def2", "22").toArray)
+ writer.close()
+ sql(
+ s"alter table partition_table add partition (email='def', age='25')
location " +
+ s"'$sdkWritePath1' partition (email='def2', age ='22') location
'$sdkWritePath2'")
+ var extSegmentQuery = sql("select * from partition_table where name =
'red'")
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25), Row(2, "red",
"def2", 22)))
+ sql("insert into partition_table select 4,'grey','bcd',23")
+ sql("insert into partition_table select 5,'red','abc',22")
+ sql("alter table partition_table compact 'minor'")
+ extSegmentQuery = sql("select * from partition_table where name = 'red'")
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25),
+ Row(2, "red", "def2", 22), Row(5, "red", "abc", 22)))
+
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+ sql("drop table if exists partition_table")
+ }
+
override protected def afterAll(): Unit = {
sql("drop index if exists indextable1 on uniqdata1")
sql("drop table if exists uniqdata1")
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 061ce90..7e6d5f3 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.spark.rdd
-import java.io.IOException
+import java.io.{File, IOException}
import java.util
import java.util.{Collections, List}
import java.util.concurrent.atomic.AtomicInteger
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.classTag
+import scala.util.control.Breaks.{break, breakable}
import org.apache.hadoop.mapreduce.{InputSplit, Job}
import org.apache.spark._
@@ -98,6 +99,27 @@ class CarbonMergerRDD[K, V](
broadCastSplits = sparkContext.broadcast(new
CarbonInputSplitWrapper(splits))
}
+ // checks for added partition specs with external path.
+ // after compaction, location path to be updated with table path.
+ def checkAndUpdatePartitionLocation(partitionSpec: PartitionSpec) :
PartitionSpec = {
+ breakable {
+ if (partitionSpec != null) {
+ carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail
=> {
+ if (loadMetaDetail.getPath != null &&
+
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString))
{
+ val updatedPartitionLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocationForPartition(
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ partitionSpec.getPartitions.toArray.mkString(File.separator))
+ partitionSpec.setLocation(updatedPartitionLocation)
+ break()
+ }
+ })
+ }
+ }
+ partitionSpec
+ }
+
override def internalCompute(theSplit: Partition, context: TaskContext):
Iterator[(K, V)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -210,6 +232,7 @@ class CarbonMergerRDD[K, V](
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
+ checkAndUpdatePartitionLocation(partitionSpec)
if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size()
== 0) {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 5db344b..68edd09 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -19,16 +19,19 @@ package org.apache.carbondata.spark.rdd
import java.util
import java.util.{Collections, List}
-import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
+import scala.util.control.Breaks.{break, breakable}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping,
CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand,
AlterTableDropPartitionCommand, CarbonMergerMapping, CompactionCallableModel,
CompactionModel}
import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}
@@ -36,6 +39,7 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock,
LockUsage}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -247,6 +251,7 @@ class CarbonTableCompactor(
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+ val updatePartitionSpecs : List[PartitionSpec] = new
util.ArrayList[PartitionSpec]
var mergeRDD: CarbonMergerRDD[String, Boolean] = null
if (carbonTable.isHivePartitionTable) {
// collect related partitions
@@ -263,6 +268,19 @@ class CarbonTableCompactor(
if (partitionSpecs != null && partitionSpecs.nonEmpty) {
compactionCallableModel.compactedPartitions = Some(partitionSpecs)
}
+ partitionSpecs.foreach(partitionSpec => {
+ breakable {
+
carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail => {
+ if (loadMetaDetail.getPath != null &&
+
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString))
{
+ // if partition spec added is external path,
+ // after compaction location path to be updated with table path.
+ updatePartitionSpecs.add(partitionSpec)
+ break()
+ }
+ })
+ }
+ })
}
val mergeStatus =
@@ -276,7 +294,26 @@ class CarbonTableCompactor(
segmentMetaDataAccumulator)
} else {
if (mergeRDD != null) {
- mergeRDD.collect
+ val result = mergeRDD.collect
+ if (!updatePartitionSpecs.isEmpty) {
+ val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
+ Some(carbonTable.getDatabaseName))
+ val partitionSpecs = updatePartitionSpecs.asScala.map {
+ partitionSpec =>
+ // replaces old partitionSpec with updated partitionSpec
+ mergeRDD.checkAndUpdatePartitionLocation(partitionSpec)
+ PartitioningUtils.parsePathFragment(
+ String.join(CarbonCommonConstants.FILE_SEPARATOR,
partitionSpec.getPartitions))
+ }
+ // To update partitionSpec in hive metastore, drop and add with
latest path.
+ AlterTableDropPartitionCommand(
+ tableIdentifier,
+ partitionSpecs,
+ true, false, true).run(sqlContext.sparkSession)
+ AlterTableAddPartitionCommand(tableIdentifier,
+ partitionSpecs.map(p => (p, None)),
false).run(sqlContext.sparkSession)
+ }
+ result
} else {
new CarbonMergerRDD(
sc.sparkSession,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 26ef3b7..7933a25 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand,
AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -125,6 +126,19 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.setColumnCompressor(columnCompressor)
loadModel.setCarbonTransactionalTable(true)
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
+ // create operationContext to fire load events
+ val operationContext: OperationContext = new OperationContext
+ val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
+ sparkSession = sparkSession,
+ carbonLoadModel = loadModel,
+ uuid = "",
+ factPath = "",
+ null,
+ null,
+ isOverwriteTable = false,
+ isDataFrame = false,
+ updateModel = None,
+ operationContext = operationContext)
// Create new entry in tablestatus file
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
@@ -133,6 +147,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.getSegmentId,
String.valueOf(loadModel.getFactTimeStamp)) +
CarbonTablePath.SEGMENT_EXT
newMetaEntry.setSegmentFile(segmentFileName)
+ // set path to identify it as external added partition
+ newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
+ .map(_.getLocation.toString).mkString(","))
val segmentsLoc =
CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR +
segmentFileName
@@ -169,6 +186,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
customSegmentIds = customSegmentIds)
val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table,
alterTableModel)
OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new
OperationContext)
+ // fire event to load data to materialized views
+ CommonLoadUtils.firePostLoadEvents(sparkSession,
+ loadModel,
+ tableIndexes,
+ indexOperationContext,
+ table,
+ operationContext)
}
}
Seq.empty[Row]
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 4c41b3a..229d6df 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -26,6 +26,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.CarbonWriter
/**
* Test class for MV to verify partition scenarios
@@ -770,5 +772,33 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll with BeforeAn
assert(sql("select timeseries(b,'day'),c from partitionone group by
timeseries(b,'day'),c").collect().length == 1)
sql("drop table if exists partitionone")
}
+
+ test("test mv with add partition based on location on partition table") {
+ sql("drop table if exists partition_table")
+ sql("create table partition_table (id int,name String) " +
+ "partitioned by(email string) stored as carbondata")
+ sql("drop materialized view if exists partitiontable_mv")
+ sql("CREATE materialized view partitiontable_mv as select name from
partition_table")
+ sql("insert into partition_table select 1,'blue','abc'")
+ val schemaFile =
+ CarbonTablePath.getSchemaFilePath(
+ CarbonEnv.getCarbonTable(None,
"partition_table")(sqlContext.sparkSession).getTablePath)
+ val sdkWritePath = target + "/" + "def"
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+ val writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def").toArray)
+ writer.write(Seq("3", "black", "def").toArray)
+ writer.close()
+ sql(s"alter table partition_table add partition (email='def') location
'$sdkWritePath'")
+ val extSegmentQuery = sql("select name from partition_table")
+ assert(TestUtil.verifyMVHit(extSegmentQuery.queryExecution.optimizedPlan,
"partitiontable_mv"))
+ checkAnswer(extSegmentQuery, Seq(Row("blue"), Row("red"), Row("black")))
+ sql("drop table if exists partition_table")
+ }
// scalastyle:on lineLength
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 5576133..955469d 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -824,9 +824,9 @@ public final class CarbonDataMergerUtil {
//check if this load is an already merged load.
if (null != segment.getMergedLoadName()) {
segments
- .add(Segment.getSegment(segment.getMergedLoadName(),
segment.getSegmentFile(), null));
+ .add(new Segment(segment.getMergedLoadName(),
segment.getSegmentFile(), null, segment));
} else {
- segments.add(Segment.getSegment(segment.getLoadName(),
segment.getSegmentFile(), null));
+ segments.add(new Segment(segment.getLoadName(),
segment.getSegmentFile(), null, segment));
}
}
return segments;
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 3a68410..dcf70b0 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -687,6 +687,16 @@ public final class CarbonDataProcessorUtil {
}
/**
+ * This method will get the store location for the given path, partition spec
+ *
+ * @return data directory path
+ */
+ public static String createCarbonStoreLocationForPartition(CarbonTable
carbonTable,
+ String partition) {
+ return carbonTable.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR +
partition;
+ }
+
+ /**
* initialise data type for measures for their storage format
*/
public static DataType[] initDataType(CarbonTable carbonTable, String
tableName,