This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3630b12 [CARBONDATA-3566] Support add segment for partition table
3630b12 is described below
commit 3630b12665bd06b1e6f8ae91a7f23bec7bad47d9
Author: Jacky Li <[email protected]>
AuthorDate: Mon Nov 4 15:28:30 2019 +0800
[CARBONDATA-3566] Support add segment for partition table
CarbonData supports ADD SEGMENT for non-partition table already,
it should also support for Hive partition table.
This closes #3431
---
.../carbondata/core/metadata/SegmentFileStore.java | 44 +++--
.../core/writer/CarbonIndexFileMergeWriter.java | 2 +-
.../hadoop/api/CarbonOutputCommitter.java | 2 +-
.../testsuite/addsegment/AddSegmentTestCase.scala | 154 ++++++++++++++++-
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../command/management/CarbonAddLoadCommand.scala | 184 +++++++++++++++++----
.../execution/strategy/MixedFormatHandler.scala | 132 +++++++++++----
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 6 +-
8 files changed, 446 insertions(+), 80 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 57eb46d..e7feb3f 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -242,24 +243,36 @@ public class SegmentFileStore {
return false;
}
- public static boolean writeSegmentFileForOthers(CarbonTable carbonTable,
Segment segment)
+ public static boolean writeSegmentFileForOthers(
+ CarbonTable carbonTable,
+ Segment segment,
+ PartitionSpec partitionSpec,
+ List<FileStatus> partitionDataFiles)
throws IOException {
String tablePath = carbonTable.getTablePath();
- CarbonFile segmentFolder =
FileFactory.getCarbonFile(segment.getSegmentPath());
- CarbonFile[] otherFiles = segmentFolder.listFiles(new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return (!file.getName().equals("_SUCCESS") &&
!file.getName().endsWith(".crc"));
- }
- });
- if (otherFiles != null && otherFiles.length > 0) {
+ CarbonFile[] dataFiles = null;
+ if (partitionDataFiles.isEmpty()) {
+ CarbonFile segmentFolder =
FileFactory.getCarbonFile(segment.getSegmentPath());
+ dataFiles = segmentFolder.listFiles(
+ file -> (!file.getName().equals("_SUCCESS") &&
!file.getName().endsWith(".crc")));
+ } else {
+ dataFiles = partitionDataFiles.stream().map(
+ fileStatus -> FileFactory.getCarbonFile(
+ fileStatus.getPath().toString())).toArray(CarbonFile[]::new);
+ }
+ if (dataFiles != null && dataFiles.length > 0) {
SegmentFile segmentFile = new SegmentFile();
segmentFile.setOptions(segment.getOptions());
FolderDetails folderDetails = new FolderDetails();
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
folderDetails.setRelative(false);
- segmentFile.addPath(segment.getSegmentPath(), folderDetails);
- for (CarbonFile file : otherFiles) {
+ if (!partitionDataFiles.isEmpty()) {
+ folderDetails.setPartitions(partitionSpec.getPartitions());
+ segmentFile.addPath(partitionSpec.getLocation().toString(),
folderDetails);
+ } else {
+ segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+ }
+ for (CarbonFile file : dataFiles) {
folderDetails.getFiles().add(file.getName());
}
String segmentFileFolder =
CarbonTablePath.getSegmentFilesLocation(tablePath);
@@ -437,18 +450,19 @@ public class SegmentFileStore {
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
- public static boolean updateSegmentFile(CarbonTable carbonTable, String
segmentId,
+ public static boolean updateTableStatusFile(CarbonTable carbonTable, String
segmentId,
String segmentFile, String tableId, SegmentFileStore segmentFileStore)
throws IOException {
- return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId,
segmentFileStore, null);
+ return updateTableStatusFile(carbonTable, segmentId, segmentFile, tableId,
segmentFileStore,
+ null);
}
/**
- * This API will update the segmentFile of a passed segment.
+ * This API will update the table status file with specified segment.
*
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
- public static boolean updateSegmentFile(CarbonTable carbonTable, String
segmentId,
+ public static boolean updateTableStatusFile(CarbonTable carbonTable, String
segmentId,
String segmentFile, String tableId, SegmentFileStore segmentFileStore,
SegmentStatus segmentStatus) throws IOException {
boolean status = false;
diff --git
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index c9d4c26..4760bdc 100644
---
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -193,7 +193,7 @@ public class CarbonIndexFileMergeWriter {
+ CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
if (!table.isHivePartitionTable()) {
SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(),
path);
- SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName,
+ SegmentFileStore.updateTableStatusFile(table, segmentId,
newSegmentFileName,
table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 21861d9..549ca7c 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -144,7 +144,7 @@ public class CarbonOutputCommitter extends
FileOutputCommitter {
uuid = operationContext.getProperty("uuid").toString();
}
- SegmentFileStore.updateSegmentFile(carbonTable, loadModel.getSegmentId(),
+ SegmentFileStore.updateTableStatusFile(carbonTable,
loadModel.getSegmentId(),
segmentFileName + CarbonTablePath.SEGMENT_EXT,
carbonTable.getCarbonTableIdentifier().getTableId(),
new SegmentFileStore(carbonTable.getTablePath(),
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 129c0f0..c9b5bf6 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -19,10 +19,10 @@ package org.apache.carbondata.spark.testsuite.addsegment
import java.io.File
import java.nio.file.{Files, Paths}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field,
Schema}
+import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
import org.junit.Assert
import scala.io.Source
@@ -595,6 +596,155 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists addSegParmore")
}
+ test("test add segment partition table") {
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists orc_table")
+
+ sql("create table parquet_table(value int, name string, age int) using
parquet partitioned by (name, age)")
+ sql("create table carbon_table(value int) partitioned by (name string, age
int) stored as carbondata")
+ sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+ sql("select * from parquet_table").show
+ val parquetRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("parquet_table")).location
+
+ // add data from parquet table to carbon table
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet',
'partition'='name:string,age:int')")
+ checkAnswer(sql("select * from carbon_table"), sql("select * from
parquet_table"))
+
+ // load new data into carbon table
+ sql("insert into carbon_table select * from parquet_table")
+ checkAnswer(sql("select * from carbon_table"), sql("select * from
parquet_table union all select * from parquet_table"))
+
+ // add another data from orc table to carbon table
+ sql("create table orc_table(value int, name string, age int) using orc
partitioned by (name, age)")
+ sql("insert into orc_table values (30, 'orc', 50), (40, 'orc', 13)")
+ sql("insert into orc_table values (30, 'fast', 10), (10, 'fast', 13)")
+ val orcRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("orc_table")).location
+ sql(s"alter table carbon_table add segment options ('path'='$orcRootPath',
'format'='orc', 'partition'='name:string,age:int')")
+ checkAnswer(sql("select * from carbon_table"),
+ sql("select * from parquet_table " +
+ "union all select * from parquet_table " +
+ "union all select * from orc_table"))
+
+ // filter query on partition column
+ checkAnswer(sql("select count(*) from carbon_table where name = 'amy'"),
Row(4))
+
+ // do compaction
+ sql("alter table carbon_table compact 'major'")
+ checkAnswer(sql("select * from carbon_table"),
+ sql("select * from parquet_table " +
+ "union all select * from parquet_table " +
+ "union all select * from orc_table"))
+
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists orc_table")
+ }
+
+ test("show segment after add segment to partition table") {
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+
+ sql("create table parquet_table(value int, name string, age int) using
parquet partitioned by (name, age)")
+ sql("create table carbon_table(value int) partitioned by (name string, age
int) stored as carbondata")
+ sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+ sql("select * from parquet_table").show
+ val parquetRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("parquet_table")).location
+
+ // add data from parquet table to carbon table
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet',
'partition'='name:string,age:int')")
+ checkAnswer(sql("select * from carbon_table"), sql("select * from
parquet_table"))
+
+ // test show segment
+ checkExistence(sql(s"show segments for table carbon_table"), true,
"spark-common/target/warehouse/parquet_table")
+ checkExistence(sql(s"show history segments for table carbon_table"), true,
"spark-common/target/warehouse/parquet_table")
+
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ }
+
+ test("test add segment partition table, missing partition option") {
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+
+ sql("create table parquet_table(value int, name string, age int) using
parquet partitioned by (name, age)")
+ sql("create table carbon_table(value int) partitioned by (name string, age
int) stored as carbondata")
+ sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+ sql("select * from parquet_table").show
+ val parquetRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("parquet_table")).location
+
+ // add data from parquet table to carbon table
+ val exception = intercept[AnalysisException](
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet')")
+ )
+ assert(exception.message.contains("partition option is required"))
+
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ }
+
+ test("test add segment partition table, unmatched partition") {
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+
+ sql("create table parquet_table(value int, name string, age int) using
parquet partitioned by (name)")
+ sql("create table carbon_table(value int) partitioned by (name string, age
int) stored as carbondata")
+ sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
+ sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
+ sql("select * from parquet_table").show
+ val parquetRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("parquet_table")).location
+
+ // add data from parquet table to carbon table
+ // unmatched partition
+ var exception = intercept[AnalysisException](
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string')")
+ )
+ assert(exception.message.contains("Partition is not same"))
+
+ // incorrect partition option
+ exception = intercept[AnalysisException](
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet',
'partition'='name:string,age:int')")
+ )
+ assert(exception.message.contains("input segment path does not comply to
partitions in carbon table"))
+
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ }
+
+ test("test add segment partition table, incorrect partition") {
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+
+ sql("create table parquet_table(value int) using parquet")
+ sql("create table carbon_table(value int) partitioned by (name string, age
int) stored as carbondata")
+ sql("insert into parquet_table values (30), (40)")
+ sql("select * from parquet_table").show
+ val parquetRootPath =
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("parquet_table")).location
+
+ // add data from parquet table to carbon table
+ // incorrect partition option
+ val exception = intercept[RuntimeException](
+ sql(s"alter table carbon_table add segment options
('path'='$parquetRootPath', 'format'='parquet',
'partition'='name:string,age:int')")
+ )
+ assert(exception.getMessage.contains("invalid partition path"))
+
+ sql("drop table if exists parquet_table")
+ sql("drop table if exists carbon_table")
+ }
+
private def copyseg(tableName: String, pathName: String): String = {
val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier(tableName))
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 031f539..488468a 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -535,7 +535,7 @@ object CarbonDataRDDFactory {
SegmentFileStore.writeSegmentFile(carbonTable,
carbonLoadModel.getSegmentId,
String.valueOf(carbonLoadModel.getFactTimeStamp))
- SegmentFileStore.updateSegmentFile(
+ SegmentFileStore.updateTableStatusFile(
carbonTable,
carbonLoadModel.getSegmentId,
segmentFileName,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 7b2c088..e9025ce 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -23,8 +23,10 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
import
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil.convertSparkToCarbonDataType
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.hive.CarbonRelation
@@ -36,8 +38,10 @@ import
org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.{PartitionSpec =>
CarbonPartitionSpec}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat,
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -47,7 +51,6 @@ import
org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.sdk.file.{Field, Schema}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles
-import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -89,45 +92,135 @@ case class CarbonAddLoadCommand(
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"delete segment")
}
- val segmentPath = options.getOrElse(
- "path", throw new UnsupportedOperationException("PATH is manadatory"))
- val allSegments =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val inputPath = options.getOrElse(
+ "path", throw new UnsupportedOperationException("PATH is mandatory"))
// If a path is already added then we should block the adding of the same
path again.
- if (allSegments.exists(a =>
- a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath)
- )) {
+ val allSegments =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ if (allSegments.exists(a => a.getPath != null &&
a.getPath.equalsIgnoreCase(inputPath))) {
throw new AnalysisException(s"path already exists in table status file,
can not add same " +
- s"segment path repeatedly: $segmentPath")
+ s"segment path repeatedly: $inputPath")
}
val format = options.getOrElse("format", "carbondata")
val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
format.equalsIgnoreCase("carbon")
// If in the given location no carbon index files are found then we should
throw an exception
- if (isCarbonFormat &&
SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) {
+ if (isCarbonFormat &&
SegmentFileStore.getListOfCarbonIndexFiles(inputPath).isEmpty) {
throw new AnalysisException("CarbonIndex files not present in the
location")
}
- val segSchema = MixedFormatHandler.getSchema(sparkSession, options,
segmentPath)
-
- val segCarbonSchema = new Schema(segSchema.fields.map { field =>
+ // infer schema and collect FileStatus for all partitions
+ val (inputPathSchema, lastLevelDirFileMap) =
+ MixedFormatHandler.collectInfo(sparkSession, options, inputPath)
+ val inputPathCarbonFields = inputPathSchema.fields.map { field =>
val dataType = convertSparkToCarbonDataType(field.dataType)
new Field(field.name, dataType)
- })
-
- val tableCarbonSchema = new Schema(tableSchema.fields.map { field =>
+ }
+ val carbonTableSchema = new Schema(tableSchema.fields.map { field =>
val dataType = convertSparkToCarbonDataType(field.dataType)
new Field(field.name, dataType)
})
+ // update schema if has partition
+ val inputPathTableFields = if (carbonTable.isHivePartitionTable) {
+ val partitions = options.getOrElse("partition",
+ throw new AnalysisException(
+ "partition option is required when adding segment to partition
table")
+ )
+ // extract partition given by user, partition option should be form of
"a:int, b:string"
+ val partitionFields = partitions
+ .split(",")
+ .map { input =>
+ if (input.nonEmpty) {
+ val nameAndDataType = input.trim.toLowerCase.split(":")
+ if (nameAndDataType.size == 2) {
+ new Field(nameAndDataType(0), nameAndDataType(1))
+ } else {
+ throw new AnalysisException(s"invalid partition option: ${
options.toString() }")
+ }
+ }
+ }
+ // validate against the partition in carbon table
+ val carbonTablePartition = getCarbonTablePartition(sparkSession)
+ if (!partitionFields.sameElements(carbonTablePartition)) {
+ throw new AnalysisException(
+ s"""
+ |Partition is not same. Carbon table partition is :
+ |${carbonTablePartition.mkString(",")} and input segment
partition is :
+ |${partitionFields.mkString(",")}
+ |""".stripMargin)
+ }
+ inputPathCarbonFields ++ partitionFields
+ } else {
+ if (options.contains("partition")) {
+ throw new AnalysisException(
+ s"Invalid option: partition, $tableName is not a partitioned table")
+ }
+ inputPathCarbonFields
+ }
- if (!tableCarbonSchema.getFields.forall(f =>
segCarbonSchema.getFields.exists(_.equals(f)))) {
+ // validate the schema including partition columns
+ val schemaMatched = carbonTableSchema.getFields.forall { field =>
+ inputPathTableFields.exists(_.equals(field))
+ }
+ if (!schemaMatched) {
throw new AnalysisException(s"Schema is not same. Table schema is : " +
- s"${tableSchema} and segment schema is :
${segSchema}")
+ s"${tableSchema} and segment schema is :
${inputPathSchema}")
+ }
+
+ // all validation is done, update the metadata accordingly
+ if (carbonTable.isHivePartitionTable) {
+ // for each partition in input path, create a new segment in carbon table
+ val partitionSpecs = collectPartitionSpecList(
+ sparkSession, carbonTable.getTablePath, inputPath,
lastLevelDirFileMap.keys.toSeq)
+ // check the collected partition from input segment path should comply to
+ // partitions in carbon table
+ val carbonTablePartition = getCarbonTablePartition(sparkSession)
+ if (partitionSpecs.head.getPartitions.size() !=
carbonTablePartition.length) {
+ throw new AnalysisException(
+ s"""
+ |input segment path does not comply to partitions in carbon table:
+ |${carbonTablePartition.mkString(",")}
+ |""".stripMargin)
+ }
+ partitionSpecs.foreach { partitionSpec =>
+ val dataFiles =
lastLevelDirFileMap.getOrElse(partitionSpec.getLocation.toString,
+ throw new RuntimeException(s"partition folder not found:
${partitionSpec.getLocation}"))
+ writeMetaForSegment(sparkSession, carbonTable, inputPath,
Some(partitionSpec), dataFiles)
+ }
+ } else {
+ writeMetaForSegment(sparkSession, carbonTable, inputPath)
}
+ Seq.empty
+ }
+
+ private def getCarbonTablePartition(sparkSession: SparkSession):
Array[Field] = {
+ sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier(tableName, databaseNameOp))
+ .partitionSchema
+ .fields
+ .map(f => new Field(f.name, convertSparkToCarbonDataType(f.dataType)))
+ }
+
+ /**
+ * Write metadata for external segment, including table status file and
segment file
+ *
+ * @param sparkSession spark session
+ * @param carbonTable carbon table
+ * @param segmentPath external segment path specified by user
+ * @param partitionSpecOp partition info extracted from the path
+ * @param partitionDataFiles all data files in the partition
+ */
+ private def writeMetaForSegment(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ segmentPath: String,
+ partitionSpecOp: Option[CarbonPartitionSpec] = None,
+ partitionDataFiles: Seq[FileStatus] = Seq.empty
+ ): Unit = {
val model = new CarbonLoadModel
model.setCarbonTransactionalTable(true)
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
@@ -147,8 +240,8 @@ case class CarbonAddLoadCommand(
val dataMapNames: mutable.Buffer[String] =
tableDataMaps.asScala.map(dataMap =>
dataMap.getDataMapSchema.getDataMapName)
val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
- new BuildDataMapPreExecutionEvent(sparkSession,
- carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+ BuildDataMapPreExecutionEvent(
+ sparkSession, carbonTable.getAbsoluteTableIdentifier, dataMapNames)
OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
dataMapOperationContext)
}
@@ -160,12 +253,16 @@ case class CarbonAddLoadCommand(
model.getFactTimeStamp,
false)
newLoadMetaEntry.setPath(segmentPath)
+ val format = options.getOrElse("format", "carbondata")
+ val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
+ format.equalsIgnoreCase("carbon")
if (!isCarbonFormat) {
newLoadMetaEntry.setFileFormat(new FileFormat(format))
}
CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true,
false)
- val segment = new Segment(model.getSegmentId,
+ val segment = new Segment(
+ model.getSegmentId,
SegmentFileStore.genSegmentFileName(
model.getSegmentId,
System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
@@ -175,19 +272,24 @@ case class CarbonAddLoadCommand(
if (isCarbonFormat) {
SegmentFileStore.writeSegmentFile(carbonTable, segment)
} else {
- SegmentFileStore.writeSegmentFileForOthers(carbonTable, segment)
+ SegmentFileStore.writeSegmentFileForOthers(
+ carbonTable, segment, partitionSpecOp.orNull,
partitionDataFiles.asJava)
}
- operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
- model.getSegmentId)
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonTable.getCarbonTableIdentifier,
- model)
-
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
+ // This event will trigger merge index job, only trigger it if it is
carbon file
+ if (isCarbonFormat) {
+ operationContext.setProperty(
+ carbonTable.getTableUniqueName + "_Segment",
+ model.getSegmentId)
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(
+ carbonTable.getCarbonTableIdentifier,
+ model)
+
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
+ }
val success = if (writeSegment) {
- SegmentFileStore.updateSegmentFile(
+ SegmentFileStore.updateTableStatusFile(
carbonTable,
model.getSegmentId,
segment.getSegmentFileName,
@@ -241,10 +343,30 @@ case class CarbonAddLoadCommand(
OperationListenerBus.getInstance()
.fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
}
- Seq.empty
}
-
+ // extract partition column and value, for example, given
+ // path1 = path/to/partition/a=1/b=earth
+ // path2 = path/to/partition/a=2/b=moon
+ // will extract a list of CarbonPartitionSpec:
+ // CarbonPartitionSpec {("a=1","b=earth"), "path/to/partition"}
+ // CarbonPartitionSpec {("a=2","b=moon"), "path/to/partition"}
+ def collectPartitionSpecList(
+ sparkSession: SparkSession,
+ tablePath: String,
+ inputPath: String,
+ partitionPaths: Seq[String]
+ ): Seq[CarbonPartitionSpec] = {
+ partitionPaths.map { path =>
+ try {
+ val partitionOnlyPath = path.substring(inputPath.length + 1)
+ val partitionColumnAndValue =
partitionOnlyPath.split("/").toList.asJava
+ new CarbonPartitionSpec(partitionColumnAndValue, path)
+ } catch {
+ case t: Throwable => throw new RuntimeException(s"invalid partition
path: $path")
+ }
+ }
+ }
override protected def opName: String = "ADD SEGMENT WITH PATH"
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 26c0fb0..3191ba8 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -19,14 +19,17 @@ package org.apache.spark.sql.execution.strategy
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{execution, MixedFormatHandlerUtil, SparkSession}
+import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, Cast, Expression, ExpressionSet,
NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -56,22 +59,76 @@ object MixedFormatHandler {
supportedFormats.exists(_.equalsIgnoreCase(format))
}
- def getSchema(sparkSession: SparkSession,
+ /**
+ * collect schema, list of last level directory and list of all data files
under given path
+ *
+ * @param sparkSession spark session
+ * @param options option for ADD SEGMENT
+ * @param inputPath under which path to collect
+ * @return schema of the data file, map of last level directory (partition
folder) to its
+ * children file list (data files)
+ */
+ def collectInfo(
+ sparkSession: SparkSession,
options: Map[String, String],
- segPath: String): StructType = {
- val format = options.getOrElse("format", "carbondata")
- if ((format.equalsIgnoreCase("carbondata") ||
format.equalsIgnoreCase("carbon"))) {
- new SparkCarbonFileFormat().inferSchema(sparkSession, options,
Seq.empty).get
+ inputPath: String): (StructType, mutable.Map[String, Seq[FileStatus]]) =
{
+ val path = new Path(inputPath)
+ val fs =
path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ val rootPath = fs.getFileStatus(path)
+ val leafDirFileMap = collectAllLeafFileStatus(sparkSession, rootPath, fs)
+ val format = options.getOrElse("format", "carbondata").toLowerCase
+ val fileFormat = if (format.equalsIgnoreCase("carbondata") ||
+ format.equalsIgnoreCase("carbon")) {
+ new SparkCarbonFileFormat()
} else {
- val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\",
"/"))
- val path = new Path(filePath)
- val fs =
path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
- val status = fs.listStatus(path, new PathFilter {
- override def accept(path: Path): Boolean = {
- !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
- }
- })
- getFileFormat(new FileFormatName(format)).inferSchema(sparkSession,
options, status).get
+ getFileFormat(new FileFormatName(format))
+ }
+ if (leafDirFileMap.isEmpty) {
+ throw new RuntimeException("no partition data is found")
+ }
+ val schema = fileFormat.inferSchema(sparkSession, options,
leafDirFileMap.head._2).get
+ (schema, leafDirFileMap)
+ }
+
+ /**
+ * collect leaf directories and leaf files recursively in given path
+ *
+ * @param sparkSession spark session
+ * @param path path to collect
+ * @param fs hadoop file system
+ * @return mapping of leaf directory to its children files
+ */
+ private def collectAllLeafFileStatus(
+ sparkSession: SparkSession,
+ path: FileStatus,
+ fs: FileSystem): mutable.Map[String, Seq[FileStatus]] = {
+ val directories: ArrayBuffer[FileStatus] = ArrayBuffer()
+ val leafFiles: ArrayBuffer[FileStatus] = ArrayBuffer()
+ val lastLevelFileMap = mutable.Map[String, Seq[FileStatus]]()
+
+ // get all files under input path
+ val fileStatus = fs.listStatus(path.getPath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
+ }
+ })
+ // collect directories and files
+ fileStatus.foreach { file =>
+ if (file.isDirectory) directories.append(file)
+ else leafFiles.append(file)
+ }
+ if (leafFiles.nonEmpty) {
+ // leaf file is found, so parent folder (input parameter) is the last
level dir
+ val updatedPath = FileFactory.getUpdatedFilePath(path.getPath.toString)
+ lastLevelFileMap.put(updatedPath, leafFiles)
+ lastLevelFileMap
+ } else {
+ // no leaf file is found, for each directory, collect recursively
+ directories.foreach { dir =>
+ val map = collectAllLeafFileStatus(sparkSession, dir, fs)
+ lastLevelFileMap ++= map
+ }
+ lastLevelFileMap
}
}
@@ -83,7 +140,8 @@ object MixedFormatHandler {
* If multiple segments are with different formats like parquet , orc etc
then it creates RDD for
* each format segments and union them.
*/
- def extraRDD(l: LogicalRelation,
+ def extraRDD(
+ l: LogicalRelation,
projects: Seq[NamedExpression],
filters: Seq[Expression],
readCommittedScope: ReadCommittedScope,
@@ -99,13 +157,28 @@ object MixedFormatHandler {
.filter(l => segsToAccess.isEmpty ||
segsToAccess.contains(l.getLoadName))
.groupBy(_.getFileFormat)
.map { case (format, detailses) =>
- val paths = detailses.flatMap { d =>
-
SegmentFileStore.readSegmentFile(CarbonTablePath.getSegmentFilePath(readCommittedScope
- .getFilePath, d.getSegmentFile)).getLocationMap.asScala.flatMap {
case (p, f) =>
- f.getFiles.asScala.map { ef =>
- new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef)
+ // collect paths as input to scan RDD
+ val paths = detailses. flatMap { d =>
+ val segmentFile = SegmentFileStore.readSegmentFile(
+ CarbonTablePath.getSegmentFilePath(readCommittedScope.getFilePath,
d.getSegmentFile))
+
+ // If it is a partition table, the path to create RDD should be the
root path of the
+ // partition folder (excluding the partition subfolder).
+ // If it is not a partition folder, collect all data file paths
+ if (segmentFile.getOptions.containsKey("partition")) {
+ val segmentPath = segmentFile.getOptions.get("path")
+ if (segmentPath == null) {
+ throw new RuntimeException("invalid segment file, 'path' option
not found")
+ }
+ Seq(new Path(segmentPath))
+ } else {
+ // If it is not a partition folder, collect all data file paths to
create RDD
+ segmentFile.getLocationMap.asScala.flatMap { case (p, f) =>
+ f.getFiles.asScala.map { ef =>
+ new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef)
+ }.toSeq
}.toSeq
- }.toSeq
+ }
}
val fileFormat = getFileFormat(format, supportBatch)
getRDDForExternalSegments(l, projects, filters, fileFormat, paths)
@@ -125,7 +198,7 @@ object MixedFormatHandler {
rdd = rdd.union(r._1)
}
}
- Some(rdd, !rdds.exists(!_._2))
+ Some(rdd, rdds.forall(_._2))
}
}
} else {
@@ -178,10 +251,13 @@ object MixedFormatHandler {
case Some(catalogTable) =>
val fileIndex =
new InMemoryFileIndex(sparkSession, paths,
catalogTable.storage.properties, None)
+ // exclude the partition in data schema
+ val dataSchema = catalogTable.schema.filterNot { column =>
+ catalogTable.partitionColumnNames.contains(column.name)}
HadoopFsRelation(
fileIndex,
catalogTable.partitionSchema,
- catalogTable.schema,
+ new StructType(dataSchema.toArray),
catalogTable.bucketSpec,
fileFormat,
catalogTable.storage.properties)(sparkSession)
@@ -253,11 +329,11 @@ object MixedFormatHandler {
dataFilters,
l.catalogTable.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
- val withFilter = afterScanFilter.map(execution.FilterExec(_,
scan)).getOrElse(scan)
+ val withFilter = afterScanFilter.map(FilterExec(_, scan)).getOrElse(scan)
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
- execution.ProjectExec(projects, withFilter)
+ ProjectExec(projects, withFilter)
}
(withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession,
outputSchema))
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 8670b13..82ea8f6 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -483,7 +483,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
/**
- * ALTER TABLE <db.tableName> ADD SEGMENT
OPTIONS('path'='path','''key'='value')
+ * ALTER TABLE [dbName.]tableName ADD SEGMENT
+ * OPTIONS('path'='path','format'='format', ['partition'='schema list'])
+ *
+ * schema list format: column_name:data_type
+ * for example: 'partition'='a:int,b:string'
*/
protected lazy val addLoad: Parser[LogicalPlan] =
ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~