Github user gvramana commented on a diff in the pull request:

    
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106095089
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 ---
    @@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: 
AlterTableModel) extends Runnab
       }
     }
     
    +private[sql] case class AlterTableDataTypeChange(
    +    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends 
RunnableCommand {
    +
    +  val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableDataTypeChangeModel.tableName
    +    val dbName = alterTableDataTypeChangeModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table change data type request has been received 
for $dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table change data type request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + 
"_" + tableName)
    +      val columnName = alterTableDataTypeChangeModel.columnName
    +      var carbonColumnToBeModified: CarbonColumn = null
    +      val carbonColumns = 
carbonTable.getCreateOrderColumn(tableName).asScala
    +      // read the latest schema file
    +      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val tableInfo: org.apache.carbondata.format.TableInfo = 
CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      // maintain the added column for schema evolution history
    +      var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
    +      var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = 
null
    +      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
    +      columnSchemaList.foreach { columnSchema =>
    +        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
    +          deletedColumnSchema = 
CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
    +          columnSchema.setData_type(DataTypeConverterUtil
    +            
.convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
    +          
columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
    +          
columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
    +          addColumnSchema = columnSchema
    +        }
    +      }
    +      val schemaEvolutionEntry = new 
SchemaEvolutionEntry(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
    +      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
    +      
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    +          tableInfo,
    +          schemaEvolutionEntry,
    +          carbonTable.getStorePath)(sparkSession)
    +
    +      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
    +      val schema = CarbonEnv.get.carbonMetastore
    +        .lookupRelation(tableIdentifier)(sparkSession).schema.json
    +      
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
    +        s"ALTER TABLE $dbName.$tableName SET 
TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
    +      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
    +      LOGGER.info(s"Alter table for data type change is successful for 
table $dbName.$tableName")
    +      LOGGER.audit(s"Alter table for data type change is successful for 
table $dbName.$tableName")
    +    } catch {
    +      case e: Exception =>
    +        LOGGER.error("Alter table change datatype failed : " + 
e.getMessage)
    +        throw e
    +    } finally {
    +      // release lock after command execution completion
    +      if (carbonLock != null) {
    +        if (carbonLock.unlock()) {
    +          LOGGER.info("Alter table change data type lock released 
successfully")
    +        } else {
    +          LOGGER.error("Unable to release lock during alter table change 
data type operation")
    +        }
    +      }
    +    }
    +    Seq.empty
    +  }
    +}
    +
    +private[sql] case class AlterTableAddColumns(
    +    alterTableAddColumnsModel: AlterTableAddColumnsModel) extends 
RunnableCommand {
    +
    +  val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableAddColumnsModel.tableName
    +    val dbName = alterTableAddColumnsModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table add columns request has been received for 
$dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table add columns request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + 
"_" + tableName)
    +      // read the latest schema file
    +      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val thriftTableInfo: org.apache.carbondata.format.TableInfo = 
CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
    +      val wrapperTableInfo = schemaConverter
    +        .fromExternalToWrapperTableInfo(thriftTableInfo,
    +          dbName,
    +          tableName,
    +          carbonTable.getStorePath)
    +      val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
    +        dbName,
    +        wrapperTableInfo,
    +        carbonTablePath,
    +        carbonTable.getCarbonTableIdentifier,
    +        carbonTable.getStorePath).process
    +      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
    +      .schema.SchemaEvolutionEntry()
    +      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
    +
    +      val thriftTable = schemaConverter
    +        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, 
tableName)
    +      
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    --- End diff --
    
    Write common functions, avoid duplicate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to