This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 93e52ff [CARBONDATA-3516] Supporting mixed formats in carbon
93e52ff is described below
commit 93e52ffc723c255885ca83fcba6d919eef459984
Author: ravipesala <[email protected]>
AuthorDate: Wed Sep 18 15:22:28 2019 +0530
[CARBONDATA-3516] Supporting mixed formats in carbon
This PR supports mixed formats on Spark Carbon integration. As per the
format of the segment which was added in PR 3381, here we can read data of the
segment using respective fileformats of spark fileformat and does the union of
all format RDDs.
This closes #3392
---
.../apache/carbondata/core/datamap/Segment.java | 7 +
.../carbondata/core/datamap/TableDataMap.java | 31 +-
.../carbondata/core/metadata/SegmentFileStore.java | 33 ++
.../core/statusmanager/LoadMetadataDetails.java | 5 +
.../testsuite/addsegment/AddSegmentTestCase.scala | 335 ++++++++++++++++++++-
.../org/apache/carbondata/api/CarbonStore.scala | 18 +-
.../spark/rdd/CarbonTableCompactor.scala | 2 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 3 +-
.../command/management/CarbonAddLoadCommand.scala | 41 ++-
.../management/CarbonShowLoadsCommand.scala | 6 +-
.../strategy/CarbonLateDecodeStrategy.scala | 71 +++--
.../execution/strategy/MixedFormatHandler.scala | 289 ++++++++++++++++++
12 files changed, 791 insertions(+), 50 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index a235f0d..b684090 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -324,6 +324,13 @@ public class Segment implements Serializable, Writable {
this.options = options;
}
+ public boolean isCarbonSegment() {
+ if (loadMetadataDetails != null) {
+ return loadMetadataDetails.isCarbonFormat();
+ }
+ return true;
+ }
+
@Override public void write(DataOutput out) throws IOException {
out.writeUTF(segmentNo);
boolean writeSegmentFileName = segmentFileName != null;
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index ff24a07..053f8fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -106,13 +106,14 @@ public final class TableDataMap extends
OperationEventListener {
/**
* Pass the valid segments and prune the datamap using filter expression
*
- * @param segments
+ * @param allsegments
* @param filter
* @return
*/
- public List<ExtendedBlocklet> prune(List<Segment> segments, final
DataMapFilter filter,
+ public List<ExtendedBlocklet> prune(List<Segment> allsegments, final
DataMapFilter filter,
final List<PartitionSpec> partitions) throws IOException {
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
+ List<Segment> segments = getCarbonSegments(allsegments);
final Map<Segment, List<DataMap>> dataMaps =
dataMapFactory.getDataMaps(segments);
// for non-filter queries
// for filter queries
@@ -143,6 +144,16 @@ public final class TableDataMap extends
OperationEventListener {
return extendedBlocklets;
}
+ private List<Segment> getCarbonSegments(List<Segment> allsegments) {
+ List<Segment> segments = new ArrayList<>();
+ for (Segment segment : allsegments) {
+ if (segment.isCarbonSegment()) {
+ segments.add(segment);
+ }
+ }
+ return segments;
+ }
+
private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws
IOException {
for (Segment segment : segments) {
@@ -343,8 +354,9 @@ public final class TableDataMap extends
OperationEventListener {
*
* @return
*/
- public List<DataMapDistributable> toDistributable(List<Segment> segments)
throws IOException {
+ public List<DataMapDistributable> toDistributable(List<Segment> allsegments)
throws IOException {
List<DataMapDistributable> distributables = new ArrayList<>();
+ List<Segment> segments = getCarbonSegments(allsegments);
for (Segment segment : segments) {
distributables.addAll(dataMapFactory.toDistributable(segment));
}
@@ -423,7 +435,8 @@ public final class TableDataMap extends
OperationEventListener {
/**
* delete only the datamaps of the segments
*/
- public void deleteDatamapData(List<Segment> segments) throws IOException {
+ public void deleteDatamapData(List<Segment> allsegments) throws IOException {
+ List<Segment> segments = getCarbonSegments(allsegments);
for (Segment segment: segments) {
dataMapFactory.deleteDatamapData(segment);
}
@@ -457,14 +470,15 @@ public final class TableDataMap extends
OperationEventListener {
/**
* Prune the datamap of the given segments and return the Map of blocklet
path and row count
*
- * @param segments
+ * @param allsegments
* @param partitions
* @return
* @throws IOException
*/
- public Map<String, Long> getBlockRowCount(List<Segment> segments,
+ public Map<String, Long> getBlockRowCount(List<Segment> allsegments,
final List<PartitionSpec> partitions, TableDataMap defaultDataMap)
throws IOException {
+ List<Segment> segments = getCarbonSegments(allsegments);
Map<String, Long> blockletToRowCountMap = new HashMap<>();
for (Segment segment : segments) {
List<CoarseGrainDataMap> dataMaps =
defaultDataMap.getDataMapFactory().getDataMaps(segment);
@@ -478,13 +492,14 @@ public final class TableDataMap extends
OperationEventListener {
/**
* Prune the datamap of the given segments and return the Map of blocklet
path and row count
*
- * @param segments
+ * @param allsegments
* @param partitions
* @return
* @throws IOException
*/
- public long getRowCount(List<Segment> segments, final List<PartitionSpec>
partitions,
+ public long getRowCount(List<Segment> allsegments, final List<PartitionSpec>
partitions,
TableDataMap defaultDataMap) throws IOException {
+ List<Segment> segments = getCarbonSegments(allsegments);
long totalRowCount = 0L;
for (Segment segment : segments) {
List<CoarseGrainDataMap> dataMaps =
defaultDataMap.getDataMapFactory().getDataMaps(segment);
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 544de16..2f71bbc 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -230,6 +230,39 @@ public class SegmentFileStore {
return false;
}
+ public static boolean writeSegmentFileForOthers(CarbonTable carbonTable,
Segment segment)
+ throws IOException {
+ String tablePath = carbonTable.getTablePath();
+ CarbonFile segmentFolder =
FileFactory.getCarbonFile(segment.getSegmentPath());
+ CarbonFile[] otherFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return (!file.getName().equals("_SUCCESS") &&
!file.getName().endsWith(".crc"));
+ }
+ });
+ if (otherFiles != null && otherFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ segmentFile.setOptions(segment.getOptions());
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ folderDetails.setRelative(false);
+ segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+ for (CarbonFile file : otherFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ String segmentFileFolder =
CarbonTablePath.getSegmentFilesLocation(tablePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(segmentFileFolder);
+ }
+ // write segment info to new file.
+ writeSegmentFile(segmentFile,
+ segmentFileFolder + File.separator + segment.getSegmentFileName());
+
+ return true;
+ }
+ return false;
+ }
+
/**
* Write segment file to the metadata folder of the table selecting only the
current load files
*
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 17a48c5..355dbfd 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -460,4 +460,9 @@ public class LoadMetadataDetails implements Serializable {
public void setPath(String path) {
this.path = path;
}
+
+ public boolean isCarbonFormat() {
+ return getFileFormat().equals(FileFormat.COLUMNAR_V3)
+ || getFileFormat().equals(FileFormat.ROW_V1);
+ }
}
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index c4acf8e..8358b1b 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -19,13 +19,11 @@ package org.apache.carbondata.spark.testsuite.addsegment
import java.io.File
import java.nio.file.{Files, Paths}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
@@ -263,6 +261,333 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
+
+ test("Test added segment with different format") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+
+ sql("select * from addsegment2").show()
+ val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
+ val path = table.location.getPath
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path, newPath)
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
+ assert(sql("select empname, designation, doj, workgroupcategory ,
workgroupcategoryname from addsegment1").collect().length == 20)
+ checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind"),Row("arvind")))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test added segment with different format more than two") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql("drop table if exists addsegment3")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment3 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using orc
+ """.stripMargin)
+
+ sql(s"""insert into addsegment3 select * from addsegment1""")
+
+ val newPath1 = copyseg("addsegment2", "addsegtest1")
+ val newPath2 = copyseg("addsegment3", "addsegtest2")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath1',
'format'='parquet')").show()
+ sql(s"alter table addsegment1 add segment options('path'='$newPath2',
'format'='orc')").show()
+ assert(sql("select empname, designation, doj, workgroupcategory ,
workgroupcategoryname from addsegment1").collect().length == 30)
+ checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind"),Row("arvind"),Row("arvind")))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath1))
+ FileFactory.deleteAllFilesOfDir(new File(newPath2))
+ }
+
+ test("Test added segment with different format more than two and use set
segment") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql("drop table if exists addsegment3")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment3 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using orc
+ """.stripMargin)
+
+ sql(s"""insert into addsegment3 select * from addsegment1""")
+
+ val newPath1 = copyseg("addsegment2", "addsegtest1")
+ val newPath2 = copyseg("addsegment3", "addsegtest2")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath1',
'format'='parquet')").show()
+ sql(s"alter table addsegment1 add segment options('path'='$newPath2',
'format'='orc')").show()
+
+ assert(sql("select empname, designation, doj, workgroupcategory ,
workgroupcategoryname from addsegment1").collect().length == 30)
+
+ sql("SET carbon.input.segments.default.addsegment1 = 0")
+ checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind")))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql("SET carbon.input.segments.default.addsegment1 = 0,1")
+ checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind"),Row("arvind")))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+
+ sql("SET carbon.input.segments.default.addsegment1 = *")
+ checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind"),Row("arvind"),Row("arvind")))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath1))
+ FileFactory.deleteAllFilesOfDir(new File(newPath2))
+ }
+
+ test("Test added segment with different format and test compaction") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
+ val path = table.location.getPath
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path, newPath)
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
+ sql("alter table addsegment1 compact 'major'")
+ assert(sql("select empname, designation, doj, workgroupcategory ,
workgroupcategoryname from addsegment1").collect().length == 40)
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(40)))
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(40)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("test filter queries on mixed formats table") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+
+ val table = sqlContext.sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location.getPath
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path, newPath)
+
+ val res1 = sql("select empname, deptname from addsegment1 where deptno=10")
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')")
+
+ val res2 = sql("select * from addsegment1 where deptno=10")
+ assert(res1.collect().length == 6)
+ assert(res2.collect().length == 6)
+ assert(sql("select empname, deptname, deptno from addsegment1 where
empname = 'arvind'")
+ .collect().length == 2)
+
+ // For testing filter columns not in projection list
+ assert(sql("select deptname, deptno from addsegment1 where empname =
'arvind'")
+ .collect().length == 2)
+
+ assert(sql("select deptname, sum(salary) from addsegment1 where empname =
'arvind' group by deptname").collect().length == 1)
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+
+ test("Test show segments for added segment with different format") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
+ val path = table.location.getPath
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path, newPath)
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
+ checkExistence(sql(s"show segments for table addsegment1"), true,
"spark-common/target/warehouse/addsegtest")
+ checkExistence(sql(s"show history segments for table addsegment1"), true,
"spark-common/target/warehouse/addsegtest")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("test parquet table") {
+ sql("drop table if exists addSegCar")
+ sql("drop table if exists addSegPar")
+ sql("drop table if exists addSegParless")
+ sql("drop table if exists addSegParmore")
+
+ sql("create table addSegCar(a int, b string) stored by 'carbondata'")
+ sql("create table addSegPar(a int, b string) using parquet")
+ sql("create table addSegParless(a int) using parquet")
+ sql("create table addSegParmore(a int, b string, c string) using parquet")
+
+ sql("insert into addSegCar values (1,'a')")
+ sql("insert into addSegPar values (2,'b')")
+ sql("insert into addSegParless values (3)")
+ sql("insert into addSegParmore values (4,'c', 'x')")
+
+ val table1 = sqlContext.sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier("addSegPar"))
+ val table2 = sqlContext.sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier("addSegParless"))
+ val table3 = sqlContext.sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier("addSegParmore"))
+
+ sql(s"alter table addSegCar add segment
options('path'='${table1.location.getPath}', 'format'='parquet')")
+ intercept[Exception] {
+ sql(s"alter table addSegCar add segment
options('path'='${table2.location.getPath}', 'format'='parquet')")
+ }
+ sql(s"alter table addSegCar add segment
options('path'='${table3.location.getPath}', 'format'='parquet')")
+
+ assert(sql("select * from addSegCar").collect().length == 3)
+
+ sql("drop table if exists addSegCar")
+ sql("drop table if exists addSegPar")
+ sql("drop table if exists addSegParless")
+ sql("drop table if exists addSegParmore")
+ }
+
+ private def copyseg(tableName: String, pathName: String): String = {
+ val table1 =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val path1 = table1.location.getPath
+ val newPath1 = storeLocation + "/" + pathName
+ FileFactory.deleteAllFilesOfDir(new File(newPath1))
+ copy(path1, newPath1)
+ newPath1
+ }
+
test("Test add segment by carbon written by sdk") {
val tableName = "add_segment_test"
sql(s"drop table if exists $tableName")
@@ -431,6 +756,10 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
def dropTable = {
sql("drop table if exists addsegment1")
sql("drop table if exists addsegment2")
+ sql("drop table if exists addSegCar")
+ sql("drop table if exists addSegPar")
+ sql("drop table if exists addSegParless")
+ sql("drop table if exists addSegParmore")
}
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index f348da7..ab8b91a 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -21,6 +21,7 @@ import java.lang.Long
import scala.collection.JavaConverters._
+import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.util.CarbonException
@@ -84,6 +85,13 @@ object CarbonStore {
"NA"
}
+ val path =
+ if (StringUtils.isNotEmpty(load.getPath)) {
+ load.getPath
+ } else {
+ "NA"
+ }
+
val startTime =
if (load.getLoadStartTime ==
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
@@ -124,10 +132,11 @@ object CarbonStore {
startTime,
endTime,
mergedTo,
- load.getFileFormat.toString,
+ load.getFileFormat.toString.toUpperCase,
load.getVisibility,
Strings.formatSize(dataSize.toFloat),
- Strings.formatSize(indexSize.toFloat))
+ Strings.formatSize(indexSize.toFloat),
+ path)
} else {
Row(
load.getLoadName,
@@ -135,9 +144,10 @@ object CarbonStore {
startTime,
endTime,
mergedTo,
- load.getFileFormat.toString,
+ load.getFileFormat.toString.toUpperCase,
Strings.formatSize(dataSize.toFloat),
- Strings.formatSize(indexSize.toFloat))
+ Strings.formatSize(indexSize.toFloat),
+ path)
}
}.toSeq
} else {
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index bfbf52d..b0f90c7 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -87,7 +87,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new
util.ArrayList[LoadMetadataDetails](
- carbonLoadModel.getLoadMetadataDetails
+
carbonLoadModel.getLoadMetadataDetails.asScala.filter(_.isCarbonFormat).asJava
)
CarbonDataMergerUtil.sortSegments(sortedSegments)
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 7edc50f..425e9de 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -50,7 +50,8 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
CarbonDataMergerUtil
.identifySegmentsToBeMerged(carbonLoadModel,
compactionModel.compactionSize,
- carbonLoadModel.getLoadMetadataDetails,
+ new util.ArrayList(
+
carbonLoadModel.getLoadMetadataDetails.asScala.filter(_.isCarbonFormat).asJava),
compactionModel.compactionType,
customSegmentIds)
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 6bc122c..34d22a7 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -24,8 +24,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
-import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil.convertSparkToCarbonDataType
import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.StructType
@@ -36,6 +37,7 @@ import
org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat,
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -43,7 +45,9 @@ import
org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMa
import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent,
LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.sdk.file.{Field, Schema}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles
+import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -73,6 +77,14 @@ case class CarbonAddLoadCommand(
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
+ if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.exists(
+ c => c.hasEncoding(Encoding.DICTIONARY) &&
!c.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported operation on global dictionary columns table")
+ }
+ if (carbonTable.isChildTable || carbonTable.isChildDataMap) {
+ throw new MalformedCarbonCommandException("Unsupported operation on
MV/Pre-aggrergated table")
+ }
// if insert overwrite in progress, do not allow add segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"delete segment")
@@ -80,10 +92,20 @@ case class CarbonAddLoadCommand(
val segmentPath = options.getOrElse(
"path", throw new UnsupportedOperationException("PATH is manadatory"))
- // TODO use the fileformat based on the format.
- val segSchema = new SparkCarbonFileFormat().inferSchema(sparkSession,
options, Seq.empty).get
+ val segSchema = MixedFormatHandler.getSchema(sparkSession, options,
segmentPath)
+
+ val segCarbonSchema = new Schema(segSchema.fields.map { field =>
+ val dataType = convertSparkToCarbonDataType(field.dataType)
+ new Field(field.name, dataType)
+ })
+
+ val tableCarbonSchema = new Schema(tableSchema.fields.map { field =>
+ val dataType = convertSparkToCarbonDataType(field.dataType)
+ new Field(field.name, dataType)
+ })
- if (!tableSchema.equals(segSchema)) {
+
+ if (!tableCarbonSchema.getFields.forall(f =>
segCarbonSchema.getFields.exists(_.equals(f)))) {
throw new AnalysisException(s"Schema is not same. Table schema is : " +
s"${tableSchema} and segment schema is :
${segSchema}")
}
@@ -121,7 +143,8 @@ case class CarbonAddLoadCommand(
false)
newLoadMetaEntry.setPath(segmentPath)
val format = options.getOrElse("format", "carbondata")
- if (!(format.equals("carbondata") || format.equals("carbon"))) {
+ val isCarbonFormat = format.equals("carbondata") || format.equals("carbon")
+ if (!isCarbonFormat) {
newLoadMetaEntry.setFileFormat(new FileFormat(format))
}
@@ -133,7 +156,11 @@ case class CarbonAddLoadCommand(
segmentPath,
new util.HashMap[String, String](options.asJava))
val writeSegment =
- SegmentFileStore.writeSegmentFile(carbonTable, segment)
+ if (isCarbonFormat) {
+ SegmentFileStore.writeSegmentFile(carbonTable, segment)
+ } else {
+ SegmentFileStore.writeSegmentFileForOthers(carbonTable, segment)
+ }
operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
model.getSegmentId)
@@ -201,5 +228,7 @@ case class CarbonAddLoadCommand(
Seq.empty
}
+
+
override protected def opName: String = "ADD SEGMENT WITH PATH"
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index dd12f34..9ce9cfb 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -43,7 +43,8 @@ case class CarbonShowLoadsCommand(
AttributeReference("File Format", StringType, nullable = false)(),
AttributeReference("Visibility", StringType, nullable = false)(),
AttributeReference("Data Size", StringType, nullable = false)(),
- AttributeReference("Index Size", StringType, nullable = false)())
+ AttributeReference("Index Size", StringType, nullable = false)(),
+ AttributeReference("Path", StringType, nullable = false)())
} else {
Seq(AttributeReference("SegmentSequenceId", StringType, nullable =
false)(),
AttributeReference("Status", StringType, nullable = false)(),
@@ -52,7 +53,8 @@ case class CarbonShowLoadsCommand(
AttributeReference("Merged To", StringType, nullable = false)(),
AttributeReference("File Format", StringType, nullable = false)(),
AttributeReference("Data Size", StringType, nullable = false)(),
- AttributeReference("Index Size", StringType, nullable = false)())
+ AttributeReference("Index Size", StringType, nullable = false)(),
+ AttributeReference("Path", StringType, nullable = false)())
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 5d238de..52411d1 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -40,9 +41,11 @@ import org.apache.spark.util.CarbonReflectionUtils
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.schema.BucketingInfo
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit,
TextMatchMaxDocUDF, TextMatchUDF}
@@ -117,7 +120,9 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
val segmentUpdateStatusManager = new SegmentUpdateStatusManager(
relation.carbonRelation.metaData.carbonTable)
val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata()
- if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) {
+ val hasNonCarbonSegment =
+
segmentUpdateStatusManager.getLoadMetadataDetails.exists(!_.isCarbonFormat)
+ if (hasNonCarbonSegment || updateDeltaMetadata != null &&
updateDeltaMetadata.nonEmpty) {
false
} else if (relation.carbonTable.isStreamingSink) {
false
@@ -231,6 +236,10 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
partitions: Seq[PartitionSpec],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
ArrayBuffer[AttributeReference], Seq[PartitionSpec]) =>
RDD[InternalRow]) = {
+ val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ val extraRdd = MixedFormatHandler.extraRDD(relation, rawProjects,
filterPredicates,
+ new TableStatusReadCommittedScope(table.identifier,
FileFactory.getConfiguration),
+ table.identifier)
val projects = rawProjects.map {p =>
p.transform {
case CustomDeterministicExpression(exp) => exp
@@ -266,7 +275,6 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
// Combines all Catalyst filter `Expression`s that are either not
convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
- val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val map = table.carbonRelation.metaData.dictionaryMap
val metadata: Map[String, String] = {
@@ -312,7 +320,12 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
// In case of more dictionary columns spark code gen needs generate lot of
code and that slows
// down the query, so we limit the direct fill in case of more dictionary
columns.
val hasMoreDictionaryCols =
hasMoreDictionaryColumnsOnProjection(projectSet, table)
- val vectorPushRowFilters =
CarbonProperties.getInstance().isPushRowFiltersForVector
+ var vectorPushRowFilters =
CarbonProperties.getInstance().isPushRowFiltersForVector
+ // In case of mixed format, make the vectorPushRowFilters always false as
other formats
+ // filtering happens in spark layer.
+ if (vectorPushRowFilters && extraRdd.isDefined) {
+ vectorPushRowFilters = false
+ }
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
@@ -340,15 +353,14 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
attr
}
val scan = getDataSourceScan(relation,
- updateProject,
- partitions,
+ (updateProject, partitions),
scanBuilder,
candidatePredicates,
pushedFilters,
handledFilters,
metadata,
needDecoder,
- updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]], extraRdd)
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
&& !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
@@ -360,6 +372,9 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
}
filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_,
scan))
.getOrElse(scan)
+ } else if (extraRdd.isDefined) {
+
filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_,
scan))
+ .getOrElse(scan)
} else {
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
}
@@ -404,7 +419,7 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
if (!vectorPushRowFilters && !implicitExisted &&
!hasDictionaryFilterCols
&& !hasMoreDictionaryCols) {
updateRequestedColumnsFunc(
- (projectSet ++ filterSet).map(relation.attributeMap).toSeq,
+ (projectsAttr.to[mutable.LinkedHashSet] ++
filterSet).map(relation.attributeMap).toSeq,
table,
needDecoder)
} else {
@@ -413,22 +428,21 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
val supportBatch =
supportBatchedDataSource(relation.relation.sqlContext,
updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
- needDecoder.isEmpty
+ needDecoder.isEmpty && extraRdd.getOrElse((null, true))._2
if (!vectorPushRowFilters && !supportBatch && !implicitExisted &&
!hasDictionaryFilterCols
&& !hasMoreDictionaryCols) {
// revert for row scan
updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns,
table, needDecoder)
}
val scan = getDataSourceScan(relation,
- updateRequestedColumns.asInstanceOf[Seq[Attribute]],
- partitions,
+ (updateRequestedColumns.asInstanceOf[Seq[Attribute]], partitions),
scanBuilder,
candidatePredicates,
pushedFilters,
handledFilters,
metadata,
needDecoder,
- updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]], extraRdd)
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
&& !implicitExisted && !hasDictionaryFilterCols &&
!hasMoreDictionaryCols) {
@@ -443,6 +457,12 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
needDecoder).asInstanceOf[Seq[NamedExpression]],
filterPredicates.reduceLeftOption(expressions.And).map(
execution.FilterExec(_, scan)).getOrElse(scan))
+ } else if (extraRdd.isDefined) {
+ execution.ProjectExec(
+ updateRequestedColumnsFunc(updatedProjects, table,
+ needDecoder).asInstanceOf[Seq[NamedExpression]],
+ filterPredicates.reduceLeftOption(expressions.And).map(
+ execution.FilterExec(_, scan)).getOrElse(scan))
} else {
execution.ProjectExec(
updateRequestedColumnsFunc(updatedProjects, table,
@@ -464,8 +484,7 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
}
private def getDataSourceScan(relation: LogicalRelation,
- output: Seq[Attribute],
- partitions: Seq[PartitionSpec],
+ outputAndPartitions: (Seq[Attribute], Seq[PartitionSpec]),
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
ArrayBuffer[AttributeReference], Seq[PartitionSpec])
=> RDD[InternalRow],
@@ -473,32 +492,34 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
pushedFilters: Seq[Filter], handledFilters: Seq[Filter],
metadata: Map[String, String],
needDecoder: ArrayBuffer[AttributeReference],
- updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
+ updateRequestedColumns: Seq[Attribute],
+ extraRDD: Option[(RDD[InternalRow], Boolean)]): DataSourceScanExec = {
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ var rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
+ pushedFilters, needDecoder, outputAndPartitions._2)
if (supportBatchedDataSource(relation.relation.sqlContext,
updateRequestedColumns) &&
- needDecoder.isEmpty) {
+ needDecoder.isEmpty && extraRDD.getOrElse((rdd, true))._2) {
+ rdd = extraRDD.map(_._1.union(rdd)).getOrElse(rdd)
new CarbonDataSourceScan(
- output,
- scanBuilder(updateRequestedColumns,
- candidatePredicates,
- pushedFilters,
- needDecoder,
- partitions),
+ outputAndPartitions._1,
+ rdd,
createHadoopFSRelation(relation),
getPartitioning(table.carbonTable, updateRequestedColumns),
metadata,
relation.catalogTable.map(_.identifier), relation)
} else {
+ rdd match {
+ case cs: CarbonScanRDD[InternalRow] => cs.setVectorReaderSupport(false)
+ case _ =>
+ }
+ rdd = extraRDD.map(_._1.union(rdd)).getOrElse(rdd)
val partition = getPartitioning(table.carbonTable,
updateRequestedColumns)
- val rdd = scanBuilder(updateRequestedColumns, candidatePredicates,
- pushedFilters, needDecoder, partitions)
- CarbonReflectionUtils.getRowDataSourceScanExecObj(relation, output,
+ CarbonReflectionUtils.getRowDataSourceScanExecObj(relation,
outputAndPartitions._1,
pushedFilters, handledFilters,
rdd, partition, metadata)
}
}
-
def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
relation: CarbonDatasourceHadoopRelation,
needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
new file mode 100644
index 0000000..f04985c
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution
+import org.apache.spark.sql.SparkSession
+import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
SegmentFileStore}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.statusmanager.{FileFormat => FileFormatName,
SegmentStatus}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo,
SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object MixedFormatHandler {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ val supportedFormats: Seq[String] =
+ Seq("carbon", "carbondata", "parquet", "orc", "json", "csv", "text")
+
+ def validateFormat(format: String): Boolean = {
+ supportedFormats.exists(_.equalsIgnoreCase(format))
+ }
+
+ def getSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ segPath: String): StructType = {
+ val format = options.getOrElse("format", "carbondata")
+ if ((format.equals("carbondata") || format.equals("carbon"))) {
+ new SparkCarbonFileFormat().inferSchema(sparkSession, options,
Seq.empty).get
+ } else {
+ val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\",
"/"))
+ val path = new Path(filePath)
+ val fs =
path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ val status = fs.listStatus(path, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
+ }
+ })
+ getFileFormat(new FileFormatName(format)).inferSchema(sparkSession,
options, status).get
+ }
+ }
+
+ /**
+ * Generates the RDD for non carbon segments. It uses the spark underlying
fileformats and
+ * generates the RDD in its native format without changing any of its flow
to keep the original
+ * performance and features.
+ *
+ * If multiple segments are with different formats like parquet , orc etc
then it creates RDD for
+ * each format segments and union them.
+ */
+ def extraRDD(l: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filters: Seq[Expression],
+ readCommittedScope: ReadCommittedScope,
+ identier: AbsoluteTableIdentifier,
+ supportBatch: Boolean = true): Option[(RDD[InternalRow], Boolean)] = {
+ val loadMetadataDetails = readCommittedScope.getSegmentList
+ val segsToAccess = getSegmentsToAccess(identier)
+ val rdds = loadMetadataDetails.filterNot(l =>
+ l.getFileFormat.equals(FileFormatName.COLUMNAR_V3) ||
+ l.getFileFormat.equals(FileFormatName.ROW_V1) &&
+ (!(l.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
+ l.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS))))
+ .filter(l => segsToAccess.isEmpty ||
segsToAccess.contains(l.getLoadName))
+ .groupBy(_.getFileFormat)
+ .map { case (format, detailses) =>
+ val paths = detailses.flatMap { d =>
+
SegmentFileStore.readSegmentFile(CarbonTablePath.getSegmentFilePath(readCommittedScope
+ .getFilePath, d.getSegmentFile)).getLocationMap.asScala.flatMap {
case (p, f) =>
+ f.getFiles.asScala.map { ef =>
+ new Path(p + CarbonCommonConstants.FILE_SEPARATOR + ef)
+ }.toSeq
+ }.toSeq
+ }
+ val fileFormat = getFileFormat(format, supportBatch)
+ getRDDForExternalSegments(l, projects, filters, fileFormat, paths)
+ }
+ if (rdds.nonEmpty) {
+ if (rdds.size == 1) {
+ Some(rdds.head)
+ } else {
+ if (supportBatch && rdds.exists(!_._2)) {
+ extraRDD(l, projects, filters, readCommittedScope, identier, false)
+ } else {
+ var rdd: RDD[InternalRow] = null
+ rdds.foreach { r =>
+ if (rdd == null) {
+ rdd = r._1
+ } else {
+ rdd = rdd.union(r._1)
+ }
+ }
+ Some(rdd, !rdds.exists(!_._2))
+ }
+ }
+ } else {
+ None
+ }
+ }
+
+ def getFileFormat(fileFormat: FileFormatName, supportBatch: Boolean = true):
FileFormat = {
+ if (fileFormat.equals(new FileFormatName("parquet"))) {
+ new ExtendedParquetFileFormat(supportBatch)
+ } else if (fileFormat.equals(new FileFormatName("orc"))) {
+ new ExtendedOrcFileFormat(supportBatch)
+ } else if (fileFormat.equals(new FileFormatName("json"))) {
+ new JsonFileFormat
+ } else if (fileFormat.equals(new FileFormatName("csv"))) {
+ new CSVFileFormat
+ } else if (fileFormat.equals(new FileFormatName("text"))) {
+ new TextFileFormat
+ } else {
+ throw new UnsupportedOperationException("Format not supported " +
fileFormat)
+ }
+ }
+
+ class ExtendedParquetFileFormat(supportBatch: Boolean) extends
ParquetFileFormat {
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ super.supportBatch(sparkSession, schema) && supportBatch
+ }
+ }
+
+ class ExtendedOrcFileFormat(supportBatch: Boolean) extends OrcFileFormat {
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ super.supportBatch(sparkSession, schema) && supportBatch
+ }
+ }
+
+ /**
+ * Generates the RDD using the spark fileformat.
+ */
+ private def getRDDForExternalSegments(l: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filters: Seq[Expression],
+ fileFormat: FileFormat,
+ paths: Seq[Path]): (RDD[InternalRow], Boolean) = {
+ val sparkSession = l.relation.sqlContext.sparkSession
+ val fs =
paths.head.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ val s = paths.map { f =>
+ fs.getFileStatus(f)
+ }
+ val fsRelation = l.catalogTable match {
+ case Some(catalogTable) =>
+ val fileIndex =
+ new InMemoryFileIndex(sparkSession, paths,
catalogTable.storage.properties, None)
+ HadoopFsRelation(
+ fileIndex,
+ catalogTable.partitionSchema,
+ catalogTable.schema,
+ catalogTable.bucketSpec,
+ fileFormat,
+ catalogTable.storage.properties)(sparkSession)
+ case _ =>
+ HadoopFsRelation(
+ new InMemoryFileIndex(sparkSession, Seq.empty, Map.empty, None),
+ new StructType(),
+ l.relation.schema,
+ None,
+ fileFormat,
+ null)(sparkSession)
+ }
+
+ // Filters on this relation fall into four categories based on where we
can use them to avoid
+ // reading unneeded data:
+ // - partition keys only - used to prune directories to read
+ // - bucket keys only - optionally used to prune files to read
+ // - keys stored in the data only - optionally used to skip groups of
data in files
+ // - filters that need to be evaluated again after the scan
+ val filterSet = ExpressionSet(filters)
+
+ // The attribute name of predicate could be different than the one in
schema in case of
+ // case insensitive, we should change them to match the one in schema, so
we do not need to
+ // worry about case sensitivity anymore.
+ val normalizedFilters = filters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(l.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
+ val partitionColumns =
+ l.resolve(
+ fsRelation.partitionSchema,
fsRelation.sparkSession.sessionState.analyzer.resolver)
+ val partitionSet = AttributeSet(partitionColumns)
+ val partitionKeyFilters =
+ ExpressionSet(normalizedFilters
+ .filter(_.references.subsetOf(partitionSet)))
+
+ LOGGER.info(s"Pruning directories with: ${
partitionKeyFilters.mkString(",") }")
+
+ val dataColumns =
+ l.resolve(fsRelation.dataSchema,
fsRelation.sparkSession.sessionState.analyzer.resolver)
+
+ // Partition keys are not available in the statistics of the files.
+ val dataFilters =
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
+
+ // Predicates with both partition keys and attributes need to be evaluated
after the scan.
+ val afterScanFilters = filterSet --
partitionKeyFilters.filter(_.references.nonEmpty)
+ LOGGER.info(s"Post-Scan Filters: ${ afterScanFilters.mkString(",") }")
+ val filterAttributes = AttributeSet(afterScanFilters)
+ val requiredExpressions = new util.LinkedHashSet[NamedExpression](
+ (projects.map(p => dataColumns.find(_.exprId == p.exprId).get) ++
+ filterAttributes.map(p => dataColumns.find(_.exprId ==
p.exprId).get)).asJava
+ ).asScala.toSeq
+ val readDataColumns =
+
requiredExpressions.filterNot(partitionColumns.contains).asInstanceOf[Seq[Attribute]]
+ val outputSchema = readDataColumns.toStructType
+ LOGGER.info(s"Output Data Schema: ${ outputSchema.simpleString(5) }")
+
+ val outputAttributes = readDataColumns ++ partitionColumns
+
+ val scan =
+ FileSourceScanExec(
+ fsRelation,
+ outputAttributes,
+ outputSchema,
+ partitionKeyFilters.toSeq,
+ dataFilters,
+ l.catalogTable.map(_.identifier))
+ val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
+ val withFilter = afterScanFilter.map(execution.FilterExec(_,
scan)).getOrElse(scan)
+ val withProjections = if (projects == withFilter.output) {
+ withFilter
+ } else {
+ execution.ProjectExec(projects, withFilter)
+ }
+ (withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession,
outputSchema))
+ }
+
+ def getSegmentsToAccess(identifier: AbsoluteTableIdentifier): Seq[String] = {
+ val carbonSessionInfo: CarbonSessionInfo = {
+ var info = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (info == null || info.getSessionParams == null) {
+ info = new CarbonSessionInfo
+ info.setSessionParams(new SessionParams())
+ }
+
info.getSessionParams.addProps(CarbonProperties.getInstance().getAddedProperty)
+ info
+ }
+ val tableUniqueKey = identifier.getDatabaseName + "." +
identifier.getTableName
+ val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
tableUniqueKey
+ val segmentsStr = carbonSessionInfo.getThreadParams
+ .getProperty(inputSegmentsKey, carbonSessionInfo.getSessionParams
+ .getProperty(inputSegmentsKey,
+ CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")))
+ if (!segmentsStr.equals("*")) {
+ segmentsStr.split(",")
+ } else {
+ Seq.empty
+ }
+ }
+}