This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8865671 [CARBONDATA-3666] Avoided listing of table dir in refresh
command
8865671 is described below
commit 8865671c32b1cf450ecc1fdc8c278904fe4a8c3f
Author: kunal642 <[email protected]>
AuthorDate: Thu Jan 16 14:28:57 2020 +0530
[CARBONDATA-3666] Avoided listing of table dir in refresh command
Why is this PR needed?
Currently if a refresh command is fired on a parquet table using carbon
session then carbon will list all the tables and check whether the table exists
or not, then we check if the schema file exists or not by listing the Metadata
folder. This can be a problem in cloud scenarios as the listing on S3 is slow.
What changes were proposed in this PR?
get the metadata for the specified table, Then go for table listing only if
the provider is carbon or the table is not registered in hive
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3581
---
.../apache/carbondata/spark/util/CommonUtil.scala | 10 +++++++
.../apache/spark/sql/hive/CarbonSessionUtil.scala | 26 +++++++++---------
.../management/RefreshCarbonTableCommand.scala | 31 +++++++++++++---------
.../spark/sql/execution/strategy/DDLStrategy.scala | 16 +++++------
.../spark/sql/hive/CarbonFileMetastore.scala | 18 +++++--------
5 files changed, 53 insertions(+), 48 deletions(-)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0fe08b..e70fc24 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{ColumnProperty, Field,
PartitionerField}
import org.apache.spark.util.FileUtils
@@ -832,4 +833,13 @@ object CommonUtil {
}
displaySize
}
+
+ def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
+ catalogTable.provider match {
+ case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource")
||
+ x.equalsIgnoreCase("carbondata")
+ case None => false
+ }
+ }
+
}
diff --git
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..968738a 100644
---
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -35,6 +35,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.carbondata.spark.util.CommonUtil
+
/**
* This class refresh the relation from cache if the carbontable in
* carbon catalog is not same as cached carbon relation's carbon table.
@@ -59,20 +61,16 @@ object CarbonSessionUtil {
* Set the stats to none in case of carbontable
*/
def setStatsNone(catalogTable: CatalogTable): Unit = {
- catalogTable.provider match {
- case Some(provider)
- if provider.equals("org.apache.spark.sql.CarbonSource") ||
- provider.equalsIgnoreCase("carbondata") =>
- // Update stats to none in case of carbon table as we are not
expecting any stats from
- // Hive. Hive gives wrong stats for carbon table.
- catalogTable.stats match {
- case Some(stats) =>
- CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats",
None)
- case _ =>
- }
- isRelationRefreshed =
- CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
- case _ =>
+ if (CommonUtil.isCarbonDataSource(catalogTable)) {
+ // Update stats to none in case of carbon table as we are not
expecting any stats from
+ // Hive. Hive gives wrong stats for carbon table.
+ catalogTable.stats match {
+ case Some(stats) =>
+ CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats",
None)
+ case _ =>
+ }
+ isRelationRefreshed =
+ CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 9251cf0..17e628f 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand,
MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
@@ -41,6 +42,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus,
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.spark.util.CommonUtil
/**
* Command to register carbon table from existing carbon table data
@@ -52,24 +54,31 @@ case class RefreshCarbonTableCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
setAuditTable(databaseName, tableName)
// Steps
- // 1. get table path
- // 2. perform the below steps
- // 2.1 check if the table already register with hive then ignore and
continue with the next
- // schema
+ // 1. Get Table Metadata from spark.
+ // 2 Perform below steps:
+ // 2.1 If table exists then check if provider if carbon. If yes then go
for carbon
+ // refresh otherwise no need to do anything.
+ // 2.1.1 If table does not exists then consider the table as carbon and
check for schema file
+ // existence.
// 2.2 register the table with the hive check if the table being
registered has aggregate table
// then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store
location.
// 2.2.2 Register the aggregate tables
- val tablePath = CarbonEnv.getTablePath(databaseNameOp,
tableName.toLowerCase)(sparkSession)
- val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName,
tableName.toLowerCase)
// 2.1 check if the table already register with hive then ignore and
continue with the next
// schema
- if (!sparkSession.sessionState.catalog.listTables(databaseName)
- .exists(_.table.equalsIgnoreCase(tableName))) {
+ val isCarbonDataSource = try {
+ CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
+ } catch {
+ case _: NoSuchTableException =>
+ true
+ }
+ if (isCarbonDataSource) {
+ val tablePath = CarbonEnv.getTablePath(databaseNameOp,
tableName.toLowerCase)(sparkSession)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName,
tableName.toLowerCase)
// check the existence of the schema file to know its a carbon table
val schemaFilePath =
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
// if schema file does not exist then the table will either non carbon
table or stale
@@ -106,9 +115,7 @@ case class RefreshCarbonTableCommand(
}
}
}
- RefreshTable(
- TableIdentifier(identifier.getTableName,
Option(identifier.getDatabaseName))
- ).run(sparkSession)
+ RefreshTable(TableIdentifier(tableName,
Option(databaseName))).run(sparkSession)
}
/**
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 80d3044..68f7442 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -31,6 +31,8 @@ import
org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, C
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil,
ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CommonUtil
/**
* Carbon strategies for ddl commands
@@ -147,20 +149,20 @@ class DDLStrategy(sparkSession: SparkSession) extends
SparkStrategy {
if isCarbonTable(truncateTable.tableName) =>
ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
case
createta...@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
- if isCarbonDataSourceTable(createTable.tableDesc) =>
+ if CommonUtil.isCarbonDataSource(createTable.tableDesc) =>
ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable,
sparkSession)) :: Nil
case MatchCreateDataSourceTable(tableDesc, mode, query)
- if isCarbonDataSourceTable(tableDesc) =>
+ if CommonUtil.isCarbonDataSource(tableDesc) =>
ExecutedCommandExec(
DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode,
sparkSession)
) :: Nil
case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc,
mode, query)
- if isCarbonDataSourceTable(tableDesc) =>
+ if CommonUtil.isCarbonDataSource(tableDesc) =>
ExecutedCommandExec(
DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode,
sparkSession)
) :: Nil
case createTable@CreateDataSourceTableCommand(table, _)
- if isCarbonDataSourceTable(table) =>
+ if CommonUtil.isCarbonDataSource(table) =>
ExecutedCommandExec(
DDLHelper.createDataSourceTable(createTable, sparkSession)
) :: Nil
@@ -195,12 +197,6 @@ class DDLStrategy(sparkSession: SparkSession) extends
SparkStrategy {
CarbonPlanHelper.isCarbonTable(tableIdent, sparkSession)
}
- private def isCarbonDataSourceTable(table: CatalogTable): Boolean = {
- table.provider.get != DDLUtils.HIVE_PROVIDER &&
- (table.provider.get.equals("org.apache.spark.sql.CarbonSource") ||
- table.provider.get.equalsIgnoreCase("carbondata"))
- }
-
private def isCarbonHiveTable(table: CatalogTable): Boolean = {
table.provider.isDefined &&
DDLUtils.HIVE_PROVIDER == table.provider.get &&
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6c7b1f2..15b2a51 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{LookupRelationPostEvent,
OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
// use to lock the carbonTables
@@ -216,12 +216,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
val catalogTable =
CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
c).asInstanceOf[CatalogTable]
- catalogTable.provider match {
- case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
- || name.equalsIgnoreCase("carbondata")) => name
- case _ =>
- CarbonMetadata.getInstance().removeTable(database,
tableIdentifier.table)
- throw new NoSuchTableException(database, tableIdentifier.table)
+ if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+ CarbonMetadata.getInstance().removeTable(database,
tableIdentifier.table)
+ throw new NoSuchTableException(database, tableIdentifier.table)
}
val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
catalogTable.location.toString, database, tableIdentifier.table)
@@ -540,11 +537,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
val catalogTable =
CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
c).asInstanceOf[CatalogTable]
- catalogTable.provider match {
- case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
- || name.equalsIgnoreCase("carbondata")) => name
- case _ =>
- throw new NoSuchTableException(tableIdentifier.database.get,
tableIdentifier.table)
+ if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+ throw new NoSuchTableException(tableIdentifier.database.get,
tableIdentifier.table)
}
val tableLocation = catalogTable.storage.locationUri match {
case tableLoc@Some(uri) =>