This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0112268 [CARBONDATA-4115] Successful load and insert will return
segment ID
0112268 is described below
commit 01122680271380a1c1d8fa2b90b12a06dcab9791
Author: areyouokfreejoe <[email protected]>
AuthorDate: Sat Jan 30 17:41:20 2021 +0800
[CARBONDATA-4115] Successful load and insert will return segment ID
Why is this PR needed?
Currently successful load and insert sql return empty Seq in carbondata, we
need it to return the segment ID.
What changes were proposed in this PR?
Successful load and insert will return segment ID.
Does this PR introduce any user interface change?
Yes. (Successful load and insert will return segment ID.)
Is any new testcase added?
Yes
This closes #4086
---
.../apache/spark/sql/CarbonCatalystOperators.scala | 4 +++-
.../scala/org/apache/spark/sql/CarbonExtensions.scala | 4 +++-
.../command/management/CarbonInsertIntoCommand.scala | 19 ++++++++++++++++---
.../command/management/CarbonLoadDataCommand.scala | 16 +++++++++++++++-
.../command/management/CommonLoadUtils.scala | 4 ++--
.../apache/spark/sql/hive/CarbonAnalysisRules.scala | 14 +++++++++++++-
.../testsuite/dataload/TestLoadDataGeneral.scala | 5 ++++-
.../dataload/TestLoadDataWithAutoLoadMerge.scala | 5 ++++-
.../allqueries/InsertIntoCarbonTableTestCase.scala | 7 +++++--
.../StandardPartitionTableLoadingTestCase.scala | 10 ++++++++--
10 files changed, 73 insertions(+), 15 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 8630c49..c5fa35d 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -70,7 +70,9 @@ case class InsertIntoCarbonTable (table:
CarbonDatasourceHadoopRelation,
ifNotExists: Boolean)
extends Command {
- override def output: Seq[Attribute] = Seq.empty
+ override def output: Seq[Attribute] = {
+ Seq(AttributeReference("Segment ID", StringType, nullable = false)())
+ }
// This is the expected schema of the table prepared to be inserted into
// including dynamic partition columns.
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
index 942935e..de6bbcb 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy,
DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule,
CarbonPreInsertionCasts}
+import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule,
CarbonLoadDataAnalyzeRule, CarbonPreInsertionCasts}
import org.apache.spark.sql.parser.CarbonExtensionSqlParser
/**
@@ -41,6 +41,8 @@ class CarbonExtensions extends (SparkSessionExtensions =>
Unit) {
.injectResolutionRule((session: SparkSession) =>
CarbonIUDAnalysisRule(session))
extensions
.injectResolutionRule((session: SparkSession) =>
CarbonPreInsertionCasts(session))
+ extensions
+ .injectResolutionRule((session: SparkSession) =>
CarbonLoadDataAnalyzeRule(session))
// carbon optimizer rules
extensions.injectPostHocResolutionRule((session: SparkSession) =>
CarbonOptimizerRule(session))
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 80c41ba..bd643ed 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -26,11 +26,11 @@ import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, CarbonEnv,
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand,
UpdateTableModel}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.CausedBy
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -186,6 +186,8 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
}
var isUpdateTableStatusRequired = false
val uuid = ""
+ var loadResultForReturn: LoadMetadataDetails = null
+ var rowsForReturn: Seq[Row] = Seq.empty
try {
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
@@ -248,6 +250,8 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
operationContext)
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
val (rows, loadResult) = insertData(loadParams)
+ loadResultForReturn = loadResult
+ rowsForReturn = rows
val info = CommonLoadUtils.makeAuditInfo(loadResult)
setAuditInfo(info)
CommonLoadUtils.firePostLoadEvents(sparkSession,
@@ -276,7 +280,16 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
}
throw ex
}
- Seq.empty
+ if (loadResultForReturn != null && loadResultForReturn.getLoadName !=
null) {
+ Seq(Row(loadResultForReturn.getLoadName))
+ } else {
+ // return the segment id in partition table case
+ rowsForReturn
+ }
+ }
+
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("Segment ID", StringType, nullable = false)())
}
private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 299b17b..4ee0e75 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand,
DataLoadTableFileMapping}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex,
HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.types.{DateType, IntegerType, LongType,
StringType, StructType, TimestampType}
@@ -106,6 +107,8 @@ case class CarbonLoadDataCommand(databaseNameOp:
Option[String],
dateFormat = df
var isUpdateTableStatusRequired = false
val uuid = ""
+ var loadResultForReturn: LoadMetadataDetails = null
+ var rowsForReturn: Seq[Row] = Seq.empty
try {
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
@@ -158,6 +161,8 @@ case class CarbonLoadDataCommand(databaseNameOp:
Option[String],
None,
operationContext)
val (rows, loadResult) = loadData(loadParams)
+ loadResultForReturn = loadResult
+ rowsForReturn = rows
val info = CommonLoadUtils.makeAuditInfo(loadResult)
setAuditInfo(info)
CommonLoadUtils.firePostLoadEvents(sparkSession,
@@ -191,7 +196,16 @@ case class CarbonLoadDataCommand(databaseNameOp:
Option[String],
throw ex
}
}
- Seq.empty
+ if (loadResultForReturn != null && loadResultForReturn.getLoadName !=
null) {
+ Seq(Row(loadResultForReturn.getLoadName))
+ } else {
+ // return the segment id in partition table case
+ rowsForReturn
+ }
+ }
+
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("Segment ID", StringType, nullable = false)())
}
def loadData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails)
= {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index d22a1e0..404cbde 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1110,12 +1110,12 @@ object CommonLoadUtils {
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
loadParams.carbonLoadModel.getTablePath,
loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
- if (specs != null) {
+ if (specs != null && !specs.isEmpty) {
specs.asScala.map { spec =>
Row(spec.getPartitions.asScala.mkString("/"),
spec.getLocation.toString, spec.getUuid)
}
} else {
- Seq.empty[Row]
+ Seq(Row(loadParams.carbonLoadModel.getSegmentId))
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 6ff2573..8261415 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -27,9 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, Cast, NamedE
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.LoadDataCommand
import
org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
+import org.apache.spark.sql.execution.strategy.{CarbonPlanHelper, DMLHelper}
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
@@ -227,6 +228,17 @@ case class CarbonIUDAnalysisRule(sparkSession:
SparkSession) extends Rule[Logica
}
}
+case class CarbonLoadDataAnalyzeRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transform {
+ case p: LogicalPlan if !p.childrenResolved => p
+ case loadData: LoadDataCommand
+ if CarbonPlanHelper.isCarbonTable(loadData.table, sparkSession) =>
+ DMLHelper.loadData(loadData)
+ }
+ }
+}
+
/**
* Insert into carbon table from other source
*/
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 61aef9d..29f8028 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -63,7 +63,10 @@ class TestLoadDataGeneral extends QueryTest with
BeforeAndAfterEach {
test("test data loading CSV file") {
val testData = s"$resourcesPath/sample.csv"
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+ checkAnswer(
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest"),
+ Seq(Row("0"))
+ )
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
Seq(Row(6))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
index ae37351..ff6884d 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
@@ -42,7 +42,10 @@ class TestLoadDataWithAutoLoadMerge extends QueryTest with
BeforeAndAfterAll {
test("test data loading with auto load merge") {
val testData = s"$resourcesPath/sample.csv"
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
+ checkAnswer(
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge"),
+ Seq(Row("0"))
+ )
checkAnswer(
sql("SELECT COUNT(*) FROM automerge"),
Seq(Row(6))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 0d167fe..629ba38 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.spark.testsuite.allqueries
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -44,7 +44,10 @@ class InsertIntoCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
test("insert from hive") {
sql("drop table if exists TCarbon")
sql("create table TCarbon (imei string,deviceInformationId int,MAC
string,deviceColor string,device_backColor string,modelId string,marketName
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series
string,productionDate timestamp,bomCode string,internalModels string,
deliveryTime string, channelsId string, channelsName string , deliveryAreaId
string, deliveryCountry string, deliveryProvince string, deliveryCity
string,deliveryDistrict string, deliveryStreet str [...]
- sql("insert into TCarbon select * from THive")
+ checkAnswer(
+ sql("insert into TCarbon select * from THive"),
+ Seq(Row("0"))
+ )
checkAnswer(
sql("select
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Acti
[...]
sql("select
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Acti
[...]
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index c8a0926..09609e0 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -95,7 +95,10 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
| PARTITIONED BY (empno int)
| STORED AS carbondata
""".stripMargin)
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""),
+ Seq(Row("0"))
+ )
validateDataFiles("default_partitionone", "0", 10)
@@ -114,7 +117,10 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
| PARTITIONED BY (doj Timestamp, empname String)
| STORED AS carbondata
""".stripMargin)
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""),
+ Seq(Row("0"))
+ )
validateDataFiles("default_partitiontwo", "0", 10)