This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new eb0848f [HOTFIX] Fix InsertFromStage complex data type issue for
partition table
eb0848f is described below
commit eb0848f882ca9b80c53a7ba00179d4e70ebe68af
Author: liuzhi <[email protected]>
AuthorDate: Thu Jan 2 22:43:55 2020 +0800
[HOTFIX] Fix InsertFromStage complex data type issue for partition table
Why is this PR needed?
CarbonInsertFromStageCommand with complex data type are not working fine.
What changes were proposed in this PR?
For the partition table, the complex data type of target table should be
converted to the binary data type.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3556
---
.../apache/carbondata/core/util/DataTypeUtil.java | 33 ++---
.../org/apache/carbon/flink/CarbonLocalWriter.java | 3 +
.../org/apache/carbon/flink/CarbonS3Writer.java | 3 +
.../carbon/flink/TestCarbonPartitionWriter.scala | 103 +++++++++++++--
.../org/apache/carbon/flink/TestCarbonWriter.scala | 10 +-
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 20 +--
.../org/apache/spark/sql/util/SparkSQLUtil.scala | 10 +-
.../management/CarbonInsertFromStageCommand.scala | 140 +++++++++++----------
.../command/management/CarbonLoadDataCommand.scala | 29 +----
9 files changed, 214 insertions(+), 137 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c07f08b..5471420 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -75,7 +75,7 @@ public final class DataTypeUtil {
/**
* DataType converter for different computing engines
*/
- private static final ThreadLocal<DataTypeConverter> converter = new
ThreadLocal<>();
+ private static DataTypeConverter converter;
/**
* This method will convert a given value to its specific type
@@ -84,8 +84,8 @@ public final class DataTypeUtil {
* @param dataType
* @return
*/
- public static Object getMeasureValueBasedOnDataType(String msrValue,
DataType dataType,
- int scale, int precision) {
+ public static Object getMeasureValueBasedOnDataType(String msrValue,
DataType dataType, int scale,
+ int precision) {
return getMeasureValueBasedOnDataType(msrValue, dataType, scale,
precision, false);
}
@@ -105,7 +105,7 @@ public final class DataTypeUtil {
new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
if (useConverter) {
- return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+ return converter.convertFromBigDecimalToDecimal(decimal);
} else {
return decimal;
}
@@ -140,11 +140,10 @@ public final class DataTypeUtil {
if (dataType == DataTypes.BOOLEAN) {
return BooleanConvert.parseBoolean(dimValue);
} else if (DataTypes.isDecimal(dataType)) {
- BigDecimal bigDecimal =
- new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
+ BigDecimal bigDecimal = new BigDecimal(dimValue).setScale(scale,
RoundingMode.HALF_UP);
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
if (useConverter) {
- return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+ return converter.convertFromBigDecimalToDecimal(decimal);
} else {
return decimal;
}
@@ -457,7 +456,7 @@ public final class DataTypeUtil {
}
} else {
// Default action for String/Varchar
- return
getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
+ return converter.convertFromStringToUTF8String(dimensionValue);
}
}
@@ -518,7 +517,7 @@ public final class DataTypeUtil {
} else if (actualDataType == DataTypes.LONG) {
return ByteUtil.toXorBytes((Long) dimensionValue);
} else if (actualDataType == DataTypes.TIMESTAMP) {
- return ByteUtil.toXorBytes((Long)dimensionValue);
+ return ByteUtil.toXorBytes((Long) dimensionValue);
} else {
// Default action for String/Varchar
return ByteUtil.toBytes(dimensionValue.toString());
@@ -970,11 +969,12 @@ public final class DataTypeUtil {
/**
* set the data type converter as per computing engine
+ *
* @param converterLocal
*/
public static void setDataTypeConverter(DataTypeConverter converterLocal) {
if (converterLocal != null) {
- converter.set(converterLocal);
+ converter = converterLocal;
timeStampformatter.remove();
dateformatter.remove();
}
@@ -989,17 +989,10 @@ public final class DataTypeUtil {
}
public static DataTypeConverter getDataTypeConverter() {
- DataTypeConverter dataTypeConverter = converter.get();
- if (dataTypeConverter == null) {
- synchronized (converter) {
- dataTypeConverter = converter.get();
- if (dataTypeConverter == null) {
- dataTypeConverter = new DataTypeConverterImpl();
- converter.set(dataTypeConverter);
- }
- }
+ if (converter == null) {
+ converter = new DataTypeConverterImpl();
}
- return dataTypeConverter;
+ return converter;
}
public static DataType valueOf(String name) {
diff --git
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index c24c3bf..ac39bd0 100644
---
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -113,6 +113,9 @@ final class CarbonLocalWriter extends CarbonWriter {
synchronized (this) {
if (!this.flushed) {
this.closeWriters();
+ this.commit();
+ this.writerFactory.reset();
+ this.writeCount.set(0);
this.flushed = true;
}
}
diff --git
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 0c8ccbd..1d3ec6b 100644
---
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -120,6 +120,9 @@ final class CarbonS3Writer extends CarbonWriter {
synchronized (this) {
if (!this.flushed) {
this.closeWriters();
+ this.commit();
+ this.writerFactory.reset();
+ this.writeCount.set(0);
this.flushed = true;
}
}
diff --git
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index cc3c4b4..447e83e 100644
---
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -71,11 +71,11 @@ class TestCarbonPartitionWriter extends QueryTest {
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
- val dataCount = 10000
+ val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
- val data = new Array[AnyRef](5)
+ val data = new Array[AnyRef](7)
data(0) = "test" + index
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
@@ -86,7 +86,7 @@ class TestCarbonPartitionWriter extends QueryTest {
@throws[InterruptedException]
override def onFinish(): Unit = {
- Thread.sleep(30000L)
+ Thread.sleep(5000L)
}
}
val stream = environment.addSource(source)
@@ -118,18 +118,99 @@ class TestCarbonPartitionWriter extends QueryTest {
assertResult(false)(FileFactory
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
- // ensure the carbon data file count in data directory
- // is same of the data file count which stage files recorded.
-
assertResult(true)(FileFactory.getCarbonFile(dataLocation).listFiles().length ==
- collectStageInputs(CarbonTablePath.getStageDir(tablePath)).map(
- stageInput =>
- stageInput.getLocations.asScala.map(location =>
location.getFiles.size()).sum
- ).sum
+ sql(s"INSERT INTO $tableName STAGE")
+
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+
+ } finally {
+ sql(s"drop table if exists $tableName").collect()
+ delDir(new File(dataPath))
+ }
+ }
+
+ @Test
+ def testComplexType(): Unit = {
+ sql(s"drop table if exists $tableName").collect()
+ sql(
+ s"""
+ | CREATE TABLE $tableName (stringField string, intField int,
shortField short,
+ | structField struct<value1:string,value2:int,value3:int>,
binaryField struct<value1:binary>)
+ | STORED AS carbondata
+ | PARTITIONED BY (hour_ string, date_ string)
+ | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField',
'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin
+ ).collect()
+
+ val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+ val dataTempPath = rootPath + "/data/temp/"
+ val dataPath = rootPath + "/data/"
+ delDir(new File(dataPath))
+ new File(dataPath).mkdir()
+
+ try {
+ val tablePath = storeLocation + "/" + tableName + "/"
+
+ val writerProperties = newWriterProperties(dataTempPath, dataPath,
storeLocation)
+ val carbonProperties = newCarbonProperties(storeLocation)
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(6)
+ environment.enableCheckpointing(2000L)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ val dataCount = 1000
+ val source = new TestSource(dataCount) {
+ @throws[InterruptedException]
+ override def get(index: Int): Array[AnyRef] = {
+ val data = new Array[AnyRef](7)
+ data(0) = "test" + index
+ data(1) = index.asInstanceOf[AnyRef]
+ data(2) = 12345.asInstanceOf[AnyRef]
+ data(3) = "test\0011\0012"
+ data(4) = "test"
+ data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+ data(6) = "20191218"
+ data
+ }
+
+ @throws[InterruptedException]
+ override def onFinish(): Unit = {
+ Thread.sleep(5000L)
+ }
+ }
+ val stream = environment.addSource(source)
+ val factory = CarbonWriterFactory.builder("Local").build(
+ "default",
+ tableName,
+ tablePath,
+ new Properties,
+ writerProperties,
+ carbonProperties
)
+ val streamSink = StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+ stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+ override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return
hour_
+ }).addSink(streamSink)
+
+ try environment.execute
+ catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+
+ val dataLocation = dataPath + "default" +
CarbonCommonConstants.FILE_SEPARATOR +
+ tableName + CarbonCommonConstants.FILE_SEPARATOR
+
+ assertResult(true)(FileFactory.isFileExist(dataLocation))
+ assertResult(false)(FileFactory
+
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
} finally {
sql(s"drop table if exists $tableName").collect()
diff --git
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 67c7bab..9195863 100644
---
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -64,7 +64,7 @@ class TestCarbonWriter extends QueryTest {
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
- val dataCount = 10000
+ val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
@@ -103,7 +103,7 @@ class TestCarbonWriter extends QueryTest {
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
// ensure the stage snapshot file and all stage files are deleted
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
@@ -116,9 +116,9 @@ class TestCarbonWriter extends QueryTest {
}
private def newWriterProperties(
- dataTempPath: String,
- dataPath: String,
- storeLocation: String) = {
+ dataTempPath: String,
+ dataPath: String,
+ storeLocation: String) = {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index f7b8668..bb5e946 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -26,15 +26,17 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.{Accumulator, CarbonInputMetrics,
DataSkewRangePartitioner, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes,
StructField, StructType}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -99,7 +101,7 @@ object DataLoadProcessBuilderOnSpark {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast,
partialSuccessAccum,
convertStepRowCounter)
- }.filter(_ != null)// Filter the bad record
+ }.filter(_ != null) // Filter the bad record
// 3. Sort
val configuration = DataLoadProcessBuilder.createConfiguration(model)
@@ -269,7 +271,7 @@ object DataLoadProcessBuilderOnSpark {
val configuration = DataLoadProcessBuilder.createConfiguration(model)
val header = configuration.getHeader
val rangeColumn = model.getRangePartitionColumn
- val rangeColumnIndex = (0 until header.length).find{
+ val rangeColumnIndex = (0 until header.length).find {
index =>
header(index).equalsIgnoreCase(rangeColumn.getColName)
}.get
@@ -427,7 +429,7 @@ object DataLoadProcessBuilderOnSpark {
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+ val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
sparkSession,
columnProjection = new CarbonProjection(columns),
null,
@@ -436,13 +438,13 @@ object DataLoadProcessBuilderOnSpark {
carbonTable.getTableInfo,
new CarbonInputMetrics,
null,
- null,
+ classOf[SparkDataTypeConverterImpl],
classOf[CarbonRowReadSupport],
splits.asJava)
.map { row =>
- new GenericRow(row.getData.asInstanceOf[Array[Any]])
+ new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
}
- sparkSession.createDataFrame(rdd, schema)
+ SparkSQLUtil.execute(rdd, schema, sparkSession)
}
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 8f39f9b..13e7c45 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -22,13 +22,17 @@ import java.lang.reflect.Method
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EmptyRule
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeSeq, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeSeq, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan,
Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CarbonReflectionUtils,
SerializableConfiguration, SparkUtil, Utils}
object SparkSQLUtil {
@@ -38,6 +42,10 @@ object SparkSQLUtil {
Dataset.ofRows(sparkSession, logicalPlan)
}
+ def execute(rdd: RDD[InternalRow], schema: StructType, sparkSession:
SparkSession): DataFrame = {
+ execute(LogicalRDD(schema.toAttributes, rdd)(sparkSession), sparkSession)
+ }
+
def getSparkSession: SparkSession = {
SparkSession.getDefaultSession.get
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index eb63d03..24e7765 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -32,12 +32,12 @@ import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -85,7 +85,6 @@ case class CarbonInsertFromStageCommand(
val tablePath = table.getTablePath
val stagePath = CarbonTablePath.getStageDir(tablePath)
val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
- var loadModel: CarbonLoadModel = null
val lock = acquireIngestLock(table)
try {
@@ -133,44 +132,21 @@ case class CarbonInsertFromStageCommand(
val executorService = Executors.newFixedThreadPool(numThreads)
val stageInputs = collectStageInputs(executorService, stageFiles)
- // 3) add new segment with INSERT_IN_PROGRESS into table status
- loadModel =
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
- CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
-
- // 4) write all existing stage file names and segmentId into a new
snapshot file
- // The content of snapshot file is: first line is segmentId, followed by
each line is
- // one stage file name
- val content =
- (Seq(loadModel.getSegmentId) ++
stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
- FileFactory.writeFile(content, snapshotFilePath)
-
- // 5) perform data loading
+ // 3) perform data loading
if (table.isHivePartitionTable) {
- startLoadingWithPartition(spark, table, loadModel, stageInputs)
+ startLoadingWithPartition(spark, table, stageInputs, stageFiles,
snapshotFilePath)
} else {
- startLoading(spark, table, loadModel, stageInputs)
+ startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath)
}
- // 6) write segment file and update the segment entry to SUCCESS
- val segmentFileName = SegmentFileStore.writeSegmentFile(
- table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
- SegmentFileStore.updateTableStatusFile(
- table, loadModel.getSegmentId, segmentFileName,
- table.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(table.getTablePath, segmentFileName),
- SegmentStatus.SUCCESS)
-
- // 7) delete stage files
+ // 4) delete stage files
deleteStageFiles(executorService, stageFiles)
- // 8) delete the snapshot file
+ // 5) delete the snapshot file
FileFactory.getCarbonFile(snapshotFilePath).delete()
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert
${table.getDatabaseName}.${table.getTableName}", ex)
- if (loadModel != null) {
- CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
- }
throw ex
} finally {
lock.unlock()
@@ -266,24 +242,55 @@ case class CarbonInsertFromStageCommand(
private def startLoading(
spark: SparkSession,
table: CarbonTable,
- loadModel: CarbonLoadModel,
- stageInput: Seq[StageInput]
+ stageInput: Seq[StageInput],
+ stageFiles: Array[(CarbonFile, CarbonFile)],
+ snapshotFilePath: String
): Unit = {
- val splits = stageInput.flatMap(_.createSplits().asScala)
- LOGGER.info(s"start to load ${splits.size} files into " +
- s"${table.getDatabaseName}.${table.getTableName}")
- val start = System.currentTimeMillis()
- val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark,
table, splits)
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
- spark,
- Option(dataFrame),
- loadModel,
- SparkSQLUtil.sessionState(spark).newHadoopConf()
- ).map { row =>
- (row._1, FailureCauses.NONE == row._2._2.failureCauses)
- }
+ var loadModel: CarbonLoadModel = null
+ try {
+ // 1) add new segment with INSERT_IN_PROGRESS into table status
+ loadModel =
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
+ CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
- LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis()
- start}ms")
+ // 2) write all existing stage file names and segmentId into a new
snapshot file
+ // The content of snapshot file is: first line is segmentId, followed by
each line is
+ // one stage file name
+ val content =
+ (Seq(loadModel.getSegmentId) ++
stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
+ FileFactory.writeFile(content, snapshotFilePath)
+
+ // 3) do loading.
+ val splits = stageInput.flatMap(_.createSplits().asScala)
+ LOGGER.info(s"start to load ${splits.size} files into " +
+ s"${table.getDatabaseName}.${table.getTableName}")
+ val start = System.currentTimeMillis()
+ val dataFrame =
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ spark,
+ Option(dataFrame),
+ loadModel,
+ SparkSQLUtil.sessionState(spark).newHadoopConf()
+ ).map { row =>
+ (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+ }
+ LOGGER.info(s"finish data loading, time taken
${System.currentTimeMillis() - start}ms")
+
+ // 4) write segment file and update the segment entry to SUCCESS
+ val segmentFileName = SegmentFileStore.writeSegmentFile(
+ table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+ SegmentFileStore.updateTableStatusFile(
+ table, loadModel.getSegmentId, segmentFileName,
+ table.getCarbonTableIdentifier.getTableId,
+ new SegmentFileStore(table.getTablePath, segmentFileName),
+ SegmentStatus.SUCCESS)
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(s"failed to insert
${table.getDatabaseName}.${table.getTableName}", ex)
+ if (loadModel != null) {
+ CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
+ }
+ throw ex
+ }
}
/**
@@ -292,15 +299,18 @@ case class CarbonInsertFromStageCommand(
private def startLoadingWithPartition(
spark: SparkSession,
table: CarbonTable,
- loadModel: CarbonLoadModel,
- stageInput: Seq[StageInput]
+ stageInput: Seq[StageInput],
+ stageFiles: Array[(CarbonFile, CarbonFile)],
+ snapshotFilePath: String
): Unit = {
val partitionDataList = listPartitionFiles(stageInput)
+
+ val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
+ FileFactory.writeFile(content, snapshotFilePath)
+
val start = System.currentTimeMillis()
- var index = 0
partitionDataList.map {
case (partition, splits) =>
- index = index + 1
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}. " +
s"Partition information: ${partition.mkString(",")}")
@@ -484,22 +494,20 @@ case class CarbonInsertFromStageCommand(
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[Row] = new CarbonScanRDD[InternalRow](
- sparkSession,
- columnProjection = new CarbonProjection(columns),
- null,
- carbonTable.getAbsoluteTableIdentifier,
- carbonTable.getTableInfo.serialize,
- carbonTable.getTableInfo,
- new CarbonInputMetrics,
- null,
- null,
- classOf[SparkRowReadSupportImpl],
- splits.asJava
- ).map { row =>
- new GenericRow(row.toSeq(schema).toArray)
- }
- sparkSession.createDataFrame(rdd, schema)
+ val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
+ sparkSession,
+ columnProjection = new CarbonProjection(columns),
+ null,
+ carbonTable.getAbsoluteTableIdentifier,
+ carbonTable.getTableInfo.serialize,
+ carbonTable.getTableInfo,
+ new CarbonInputMetrics,
+ null,
+ classOf[SparkDataTypeConverterImpl],
+ classOf[SparkRowReadSupportImpl],
+ splits.asJava
+ )
+ SparkSQLUtil.execute(rdd, schema, sparkSession)
}
override protected def opName: String = "INSERT STAGE"
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0309e91..1334178 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -52,7 +52,6 @@ import
org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -662,34 +661,14 @@ case class CarbonLoadDataCommand(
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+ val catalogAttributes = catalogTable.schema.toAttributes
// Converts the data as per the loading steps before give it to writer or
sorter
- val convertedRdd = convertData(
+ val updatedRdd = convertData(
rdd,
sparkSession,
loadModel,
isDataFrame,
partitionValues)
- val updatedRdd = if (isDataFrame) {
- val columnCount = loadModel.getCsvHeaderColumns.length
- convertedRdd.map { row =>
- val array = new Array[AnyRef](columnCount)
- val data = row.getData
- var i = 0
- while (i < columnCount) {
- data(i) match {
- case string: String =>
- array(i) = UTF8String.fromString(string)
- case _ =>
- array(i) = data(i)
- }
- i = i + 1
- }
- array
- }.map(row => InternalRow.fromSeq(row))
- } else {
- convertedRdd.map(row => InternalRow.fromSeq(row.getData))
- }
- val catalogAttributes = catalogTable.schema.toAttributes
var attributes = curAttributes.map(a => {
catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
})
@@ -783,7 +762,7 @@ case class CarbonLoadDataCommand(
sparkSession: SparkSession,
model: CarbonLoadModel,
isDataFrame: Boolean,
- partitionValues: Array[String]): RDD[CarbonRow] = {
+ partitionValues: Array[String]): RDD[InternalRow] = {
val sc = sparkSession.sparkContext
val info =
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
@@ -827,7 +806,7 @@ case class CarbonLoadDataCommand(
partialSuccessAccum,
inputStepRowCounter,
keepActualData = true)
- }.filter(_ != null)
+ }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
finalRDD
}