This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0112268  [CARBONDATA-4115] Successful load and insert will return 
segment ID
0112268 is described below

commit 01122680271380a1c1d8fa2b90b12a06dcab9791
Author: areyouokfreejoe <[email protected]>
AuthorDate: Sat Jan 30 17:41:20 2021 +0800

    [CARBONDATA-4115] Successful load and insert will return segment ID
    
    Why is this PR needed?
    Currently successful load and insert sql return empty Seq in carbondata, we 
need it to return the segment ID.
    
    What changes were proposed in this PR?
    Successful load and insert will return segment ID.
    
    Does this PR introduce any user interface change?
    Yes. (Successful load and insert will return segment ID.)
    
    Is any new testcase added?
    Yes
    
    This closes #4086
---
 .../apache/spark/sql/CarbonCatalystOperators.scala    |  4 +++-
 .../scala/org/apache/spark/sql/CarbonExtensions.scala |  4 +++-
 .../command/management/CarbonInsertIntoCommand.scala  | 19 ++++++++++++++++---
 .../command/management/CarbonLoadDataCommand.scala    | 16 +++++++++++++++-
 .../command/management/CommonLoadUtils.scala          |  4 ++--
 .../apache/spark/sql/hive/CarbonAnalysisRules.scala   | 14 +++++++++++++-
 .../testsuite/dataload/TestLoadDataGeneral.scala      |  5 ++++-
 .../dataload/TestLoadDataWithAutoLoadMerge.scala      |  5 ++++-
 .../allqueries/InsertIntoCarbonTableTestCase.scala    |  7 +++++--
 .../StandardPartitionTableLoadingTestCase.scala       | 10 ++++++++--
 10 files changed, 73 insertions(+), 15 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 8630c49..c5fa35d 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -70,7 +70,9 @@ case class InsertIntoCarbonTable (table: 
CarbonDatasourceHadoopRelation,
     ifNotExists: Boolean)
   extends Command {
 
-    override def output: Seq[Attribute] = Seq.empty
+    override def output: Seq[Attribute] = {
+      Seq(AttributeReference("Segment ID", StringType, nullable = false)())
+    }
 
     // This is the expected schema of the table prepared to be inserted into
     // including dynamic partition columns.
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
index 942935e..de6bbcb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, 
CarbonPreInsertionCasts}
+import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, 
CarbonLoadDataAnalyzeRule, CarbonPreInsertionCasts}
 import org.apache.spark.sql.parser.CarbonExtensionSqlParser
 
 /**
@@ -41,6 +41,8 @@ class CarbonExtensions extends (SparkSessionExtensions => 
Unit) {
       .injectResolutionRule((session: SparkSession) => 
CarbonIUDAnalysisRule(session))
     extensions
       .injectResolutionRule((session: SparkSession) => 
CarbonPreInsertionCasts(session))
+    extensions
+      .injectResolutionRule((session: SparkSession) => 
CarbonLoadDataAnalyzeRule(session))
 
     // carbon optimizer rules
     extensions.injectPostHocResolutionRule((session: SparkSession) => 
CarbonOptimizerRule(session))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 80c41ba..bd643ed 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -26,11 +26,11 @@ import scala.collection.mutable
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, 
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.util.CausedBy
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -186,6 +186,8 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     }
     var isUpdateTableStatusRequired = false
     val uuid = ""
+    var loadResultForReturn: LoadMetadataDetails = null
+    var rowsForReturn: Seq[Row] = Seq.empty
     try {
       val (tableIndexes, indexOperationContext) =
         CommonLoadUtils.firePreLoadEvents(
@@ -248,6 +250,8 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
         operationContext)
       LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
       val (rows, loadResult) = insertData(loadParams)
+      loadResultForReturn = loadResult
+      rowsForReturn = rows
       val info = CommonLoadUtils.makeAuditInfo(loadResult)
       setAuditInfo(info)
       CommonLoadUtils.firePostLoadEvents(sparkSession,
@@ -276,7 +280,16 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
         }
         throw ex
     }
-    Seq.empty
+    if (loadResultForReturn != null && loadResultForReturn.getLoadName != 
null) {
+      Seq(Row(loadResultForReturn.getLoadName))
+    } else {
+      // return the segment id in partition table case
+      rowsForReturn
+    }
+  }
+
+  override val output: Seq[Attribute] = {
+    Seq(AttributeReference("Segment ID", StringType, nullable = false)())
   }
 
   private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 299b17b..4ee0e75 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.{DateType, IntegerType, LongType, 
StringType, StructType, TimestampType}
@@ -106,6 +107,8 @@ case class CarbonLoadDataCommand(databaseNameOp: 
Option[String],
     dateFormat = df
     var isUpdateTableStatusRequired = false
     val uuid = ""
+    var loadResultForReturn: LoadMetadataDetails = null
+    var rowsForReturn: Seq[Row] = Seq.empty
     try {
       val (tableIndexes, indexOperationContext) =
         CommonLoadUtils.firePreLoadEvents(
@@ -158,6 +161,8 @@ case class CarbonLoadDataCommand(databaseNameOp: 
Option[String],
         None,
         operationContext)
       val (rows, loadResult) = loadData(loadParams)
+      loadResultForReturn = loadResult
+      rowsForReturn = rows
       val info = CommonLoadUtils.makeAuditInfo(loadResult)
       setAuditInfo(info)
       CommonLoadUtils.firePostLoadEvents(sparkSession,
@@ -191,7 +196,16 @@ case class CarbonLoadDataCommand(databaseNameOp: 
Option[String],
           throw ex
         }
     }
-    Seq.empty
+    if (loadResultForReturn != null && loadResultForReturn.getLoadName != 
null) {
+      Seq(Row(loadResultForReturn.getLoadName))
+    } else {
+      // return the segment id in partition table case
+      rowsForReturn
+    }
+  }
+
+  override val output: Seq[Attribute] = {
+    Seq(AttributeReference("Segment ID", StringType, nullable = false)())
   }
 
   def loadData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) 
= {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index d22a1e0..404cbde 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1110,12 +1110,12 @@ object CommonLoadUtils {
       
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
         loadParams.carbonLoadModel.getTablePath,
         loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
-    if (specs != null) {
+    if (specs != null && !specs.isEmpty) {
       specs.asScala.map { spec =>
         Row(spec.getPartitions.asScala.mkString("/"), 
spec.getLocation.toString, spec.getUuid)
       }
     } else {
-      Seq.empty[Row]
+      Seq(Row(loadParams.carbonLoadModel.getSegmentId))
     }
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 6ff2573..8261415 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -27,9 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, Cast, NamedE
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.LoadDataCommand
 import 
org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
+import org.apache.spark.sql.execution.strategy.{CarbonPlanHelper, DMLHelper}
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
@@ -227,6 +228,17 @@ case class CarbonIUDAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logica
   }
 }
 
+case class CarbonLoadDataAnalyzeRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transform {
+      case p: LogicalPlan if !p.childrenResolved => p
+      case loadData: LoadDataCommand
+        if CarbonPlanHelper.isCarbonTable(loadData.table, sparkSession) =>
+        DMLHelper.loadData(loadData)
+    }
+  }
+}
+
 /**
  * Insert into carbon table from other source
  */
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 61aef9d..29f8028 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -63,7 +63,10 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
 
   test("test data loading CSV file") {
     val testData = s"$resourcesPath/sample.csv"
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest"),
+      Seq(Row("0"))
+    )
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
       Seq(Row(6))
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
index ae37351..ff6884d 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
@@ -42,7 +42,10 @@ class TestLoadDataWithAutoLoadMerge extends QueryTest with 
BeforeAndAfterAll {
 
   test("test data loading with auto load merge") {
     val testData = s"$resourcesPath/sample.csv"
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
+    checkAnswer(
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge"),
+      Seq(Row("0"))
+    )
     checkAnswer(
       sql("SELECT COUNT(*) FROM automerge"),
       Seq(Row(6))
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 0d167fe..629ba38 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.allqueries
 
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -44,7 +44,10 @@ class InsertIntoCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
   test("insert from hive") {
      sql("drop table if exists TCarbon")
      sql("create table TCarbon (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet str [...]
-     sql("insert into TCarbon select * from THive")
+     checkAnswer(
+       sql("insert into TCarbon select * from THive"),
+        Seq(Row("0"))
+     )
      checkAnswer(
          sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Acti
 [...]
          sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Acti
 [...]
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index c8a0926..09609e0 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -95,7 +95,10 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
         | PARTITIONED BY (empno int)
         | STORED AS carbondata
       """.stripMargin)
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""),
+      Seq(Row("0"))
+    )
 
     validateDataFiles("default_partitionone", "0", 10)
 
@@ -114,7 +117,10 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
         | PARTITIONED BY (doj Timestamp, empname String)
         | STORED AS carbondata
       """.stripMargin)
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""),
+      Seq(Row("0"))
+    )
 
     validateDataFiles("default_partitiontwo", "0", 10)
 

Reply via email to