[CARBONDATA-1909] Load is failing during insert into operation when load is concurrently done to source table
This closes #1693 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ab763474 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ab763474 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ab763474 Branch: refs/heads/carbonstore Commit: ab763474f9a8191d84ea742a8b6ee615d310999a Parents: a597c2f Author: Manohar <[email protected]> Authored: Wed Dec 20 15:09:45 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Mon Jan 29 15:30:53 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 18 +++++++++++ .../management/CarbonInsertIntoCommand.scala | 34 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab763474/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 13c8a42..f46feef 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -865,6 +865,24 @@ public final class CarbonCommonConstants { public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true"; /** + * If we are executing insert into query from source table using select statement + * & loading the same source table concurrently, when select happens on source table + * during the data load , it gets new record for which dictionary is not generated, + * So there will be inconsistency. To avoid this condition we can persist the dataframe + * into MEMORY_AND_DISK and perform insert into operation. By default this value + * will be false because no need to persist the dataframe in all cases. If user want + * to run load and insert queries on source table concurrently then user can enable this flag + */ + @CarbonProperty + public static final String CARBON_INSERT_PERSIST_ENABLED = "carbon.insert.persist.enable"; + + /** + * by default rdd will not be persisted in the insert case. + + */ + public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false"; + + /** * default name of data base */ public static final String DATABASE_DEFAULT_NAME = "default"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab763474/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 626cdba..86d6759 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -18,9 +18,14 @@ package org.apache.spark.sql.execution.command.management import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} +import org.apache.spark.storage.StorageLevel +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.util.CarbonSparkUtil case class CarbonInsertIntoCommand( @@ -33,7 +38,26 @@ case class CarbonInsertIntoCommand( var loadCommand: CarbonLoadDataCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val df = Dataset.ofRows(sparkSession, child) + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + def containsLimit(plan: LogicalPlan): Boolean = { + plan find { + case limit: GlobalLimit => true + case other => false + } isDefined + } + val isPersistEnabledUserValue = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, + CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) + val isPersistRequired = + isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child) + val df = + if (isPersistRequired) { + LOGGER.audit("Persist enabled for Insert operation") + Dataset.ofRows(sparkSession, child) + .persist(StorageLevel.MEMORY_AND_DISK) + } else { + Dataset.ofRows(sparkSession, child) + } val header = relation.tableSchema.get.fields.map(_.name).mkString(",") loadCommand = CarbonLoadDataCommand( databaseNameOp = Some(relation.carbonRelation.databaseName), @@ -48,7 +72,11 @@ case class CarbonInsertIntoCommand( tableInfoOp = None, internalOptions = Map.empty, partition = partition) - loadCommand.processMetadata(sparkSession) + val load = loadCommand.processMetadata(sparkSession) + if (isPersistRequired) { + df.unpersist() + } + load } override def processData(sparkSession: SparkSession): Seq[Row] = { if (null != loadCommand) {
