This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d08688a [CARBONDATA-3623]: Fixed global sort compaction failure on
timestamp column
d08688a is described below
commit d08688a1e4164949c4761bf394a01752787457a8
Author: akkio-97 <[email protected]>
AuthorDate: Wed Dec 18 19:24:13 2019 +0530
[CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column
Problem:
In Carbondata the timestamp is converted into long and stored in RDD.
While collect(action) is applied on dataframe it gives an error due to the
confliction between data types that is - long in carbonScanRDD(from which
dataframe is created) and Timestamp in schema.
Solution:
Called Dataset.ofRows(LogicalPlan) which returns the dataframe. Used Set
command which sets all segments to be compacted. And Unset once the compaction
is finished.
This closes #3515
---
.../dataload/TestGlobalSortDataLoad.scala | 19 +++++
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 +---------
.../org/apache/spark/sql/util/SparkSQLUtil.scala | 17 ++++
.../spark/rdd/CarbonTableCompactor.scala | 66 ++++++++-------
.../management/CarbonInsertFromStageCommand.scala | 94 ++++++++++------------
5 files changed, 117 insertions(+), 123 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2aad2f3..9882e15 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -140,6 +140,25 @@ class TestGlobalSortDataLoad extends QueryTest with
BeforeAndAfterEach with Befo
}
// ----------------------------------- Compaction
-----------------------------------
+ test("compaction major: timestamp and long data type confliction")
+ {
+ sql("drop table if exists compactionTable")
+ sql("create table compactionTable (DOJ timestamp, DOB date) STORED BY
'org.apache.carbondata.format'")
+ sql("alter table compactionTable set tblproperties('sort_columns'='doj,
dob', 'sort_scope'='global_sort')")
+ sql("INSERT INTO compactionTable select '2017-10-12 21:22:23',
'1997-10-10'")
+ sql("INSERT INTO compactionTable select '2018-11-12 20:22:23',
'1997-10-10'")
+ sql("alter table compactionTable compact 'major'")
+ val showSegments = sql("show segments for table compactiontable").collect()
+ showSegments.find(_.get(0).toString.equals("0.1")) match {
+ case Some(row) => assert(row.get(1).toString.equalsIgnoreCase("Success"))
+ }
+ showSegments.find(_.get(0).toString.equals("1")) match {
+ case Some(row) =>
assert(row.get(1).toString.equalsIgnoreCase("Compacted"))
+ }
+ showSegments.find(_.get(0).toString.equals("0")) match {
+ case Some(row) =>
assert(row.get(1).toString.equalsIgnoreCase("Compacted"))
+ }
+ }
test("Compaction GLOBAL_SORT * 2") {
sql("DROP TABLE IF EXISTS carbon_localsort_twice")
sql(
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index ae859c0..dc97cd9 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -22,20 +22,16 @@ import java.util.Comparator
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.spark.{Accumulator, CarbonInputMetrics,
DataSkewRangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes,
StructField, StructType}
@@ -44,7 +40,6 @@ import
org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
-import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration,
DataField, DataLoadProcessBuilder, FailureCauses}
import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
@@ -52,9 +47,8 @@ import
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator,
NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil,
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
+import org.apache.carbondata.spark.rdd.StringArrayRow
import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.store.CarbonRowReadSupport
/**
* Use sortBy operator in spark to load the data
@@ -429,38 +423,6 @@ object DataLoadProcessBuilderOnSpark {
}
loadModel
}
-
- /**
- * create DataFrame basing on specified splits
- */
- def createInputDataFrame(
- sparkSession: SparkSession,
- carbonTable: CarbonTable,
- splits: Seq[InputSplit]
- ): DataFrame = {
- val columns = carbonTable
- .getCreateOrderColumn
- .asScala
- .map(_.getColName)
- .toArray
- val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
- sparkSession,
- columnProjection = new CarbonProjection(columns),
- null,
- carbonTable.getAbsoluteTableIdentifier,
- carbonTable.getTableInfo.serialize,
- carbonTable.getTableInfo,
- new CarbonInputMetrics,
- null,
- classOf[SparkDataTypeConverterImpl],
- classOf[CarbonRowReadSupport],
- splits.asJava)
- .map { row =>
- new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
- }
- SparkSQLUtil.execute(rdd, schema, sparkSession)
- }
}
class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 13e7c45..056e555 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -35,6 +35,8 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CarbonReflectionUtils,
SerializableConfiguration, SparkUtil, Utils}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState =
sparkSession.sessionState
@@ -257,4 +259,19 @@ object SparkSQLUtil {
taskGroupDesc
}
+ /**
+ * create DataFrame based on carbonTable
+ */
+ def createInputDataFrame(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable): DataFrame = {
+ /**
+ *
[[org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType]]
validates the
+ * datatype of column data and corresponding datatype in schema provided
to create dataframe.
+ * Since carbonScanRDD gives Long data for timestamp column and
corresponding column datatype in
+ * schema is Timestamp, this validation fails if we use createDataFrame
API which takes rdd as
+ * input. Hence, using below API which creates dataframe from tablename.
+ */
+ sparkSession.sqlContext.table(carbonTable.getTableName)
+ }
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 56b6a2e..aeeec3a 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -26,38 +26,30 @@ import scala.collection.mutable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
-import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
import org.apache.spark.sql.execution.command.{CarbonMergerMapping,
CompactionCallableModel, CompactionModel}
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.MergeIndexUtil
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat, CarbonTableOutputFormat}
-import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.FailureCauses
-import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil,
CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.store.CarbonRowReadSupport
/**
* This class is used to perform compaction on carbon table.
@@ -374,26 +366,40 @@ class CarbonTableCompactor(carbonLoadModel:
CarbonLoadModel,
sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val splits = splitsOfSegments(
sparkSession,
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ table,
carbonMergerMapping.validSegments)
- val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(
- sparkSession,
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
- splits.asScala)
- // generate LoadModel which can be used global_sort flow
- val outputModel =
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
- sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
- outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
- sparkSession,
- Option(dataFrame),
- outputModel,
- SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
- .map { row =>
- (row._1, FailureCauses.NONE == row._2._2.failureCauses)
- }
+ var loadResult: Array[(String, Boolean)] = null
+ try {
+ CarbonUtils
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ table.getDatabaseName + CarbonCommonConstants.POINT +
table.getTableName,
+ splits.asScala.map(s =>
s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+ val dataFrame = SparkSQLUtil.createInputDataFrame(
+ sparkSession,
+ table)
+
+ // generate LoadModel which can be used global_sort flow
+ val outputModel =
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
+ sparkSession, table)
+
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
+ loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ sparkSession,
+ Option(dataFrame),
+ outputModel,
+ SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ .map { row =>
+ (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+ }
+ } finally {
+ CarbonUtils
+ .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ table.getDatabaseName + "." +
+ table.getTableName)
+ }
+ loadResult
}
/**
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0cbccdd..a4dd45b 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -28,16 +28,12 @@ import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.log4j.Logger
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -46,13 +42,11 @@ import
org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileSt
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager, StageInput}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
/**
* Collect stage input files and trigger a loading into carbon table.
@@ -264,14 +258,25 @@ case class CarbonInsertFromStageCommand(
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}")
val start = System.currentTimeMillis()
- val dataFrame =
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
- spark,
- Option(dataFrame),
- loadModel,
- SparkSQLUtil.sessionState(spark).newHadoopConf()
- ).map { row =>
+ try {
+ CarbonUtils
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ table.getDatabaseName + CarbonCommonConstants.POINT +
table.getTableName,
+ splits.map(s =>
s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+ val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table)
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ spark,
+ Option(dataFrame),
+ loadModel,
+ SparkSQLUtil.sessionState(spark).newHadoopConf()
+ ).map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
+ }
+ } finally {
+ CarbonUtils
+ .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ table.getDatabaseName + "." +
+ table.getTableName)
}
LOGGER.info(s"finish data loading, time taken
${System.currentTimeMillis() - start}ms")
@@ -311,15 +316,30 @@ case class CarbonInsertFromStageCommand(
val start = System.currentTimeMillis()
partitionDataList.map {
case (partition, splits) =>
- LOGGER.info(s"start to load ${splits.size} files into " +
- s"${table.getDatabaseName}.${table.getTableName}. " +
- s"Partition information: ${partition.mkString(",")}")
- val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits)
+ LOGGER.info(s"start to load ${ splits.size } files into " +
+ s"${ table.getDatabaseName }.${ table.getTableName }. " +
+ s"Partition information: ${ partition.mkString(",") }")
+ val dataFrame = try {
+ // Segments should be set for query here, because consider a
scenario where custom
+ // compaction is triggered, so it can happen that all the segments
might be taken into
+ // consideration instead of custom segments if we do not set,
leading to duplicate data in
+ // compacted segment. To avoid this, segments to be considered are
to be set in threadset.
+ CarbonUtils
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ table.getDatabaseName + CarbonCommonConstants.POINT +
+ table.getTableName,
+ splits.map(split =>
split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+ SparkSQLUtil.createInputDataFrame(spark, table)
+ } finally {
+ CarbonUtils.threadUnset(
+ CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
table.getDatabaseName +
+ CarbonCommonConstants.POINT +
+ table.getTableName)
+ }
val columns = dataFrame.columns
val header = columns.mkString(",")
val selectColumns = columns.filter(!partition.contains(_))
val selectedDataFrame = dataFrame.select(selectColumns.head,
selectColumns.tail: _*)
-
val loadCommand = CarbonLoadDataCommand(
databaseNameOp = Option(table.getDatabaseName),
tableName = table.getTableName,
@@ -337,7 +357,7 @@ case class CarbonInsertFromStageCommand(
)
loadCommand.run(spark)
}
- LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis()
- start}ms")
+ LOGGER.info(s"finish data loading, time taken ${
System.currentTimeMillis() - start }ms")
}
/**
@@ -481,35 +501,5 @@ case class CarbonInsertFromStageCommand(
}
}
- /**
- * create DataFrame basing on specified splits
- */
- private def createInputDataFrameOfInternalRow(
- sparkSession: SparkSession,
- carbonTable: CarbonTable,
- splits: Seq[InputSplit]
- ): DataFrame = {
- val columns = carbonTable
- .getCreateOrderColumn
- .asScala
- .map(_.getColName)
- .toArray
- val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
- sparkSession,
- columnProjection = new CarbonProjection(columns),
- null,
- carbonTable.getAbsoluteTableIdentifier,
- carbonTable.getTableInfo.serialize,
- carbonTable.getTableInfo,
- new CarbonInputMetrics,
- null,
- classOf[SparkDataTypeConverterImpl],
- classOf[SparkRowReadSupportImpl],
- splits.asJava
- )
- SparkSQLUtil.execute(rdd, schema, sparkSession)
- }
-
override protected def opName: String = "INSERT STAGE"
}