Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r196627213
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala
 ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.management
    +
    +import java.util.UUID
    +
    +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
    +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
    +import org.apache.spark.sql.execution.command.AtomicRunnableCommand
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.util.FileUtils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datamap.status.DataMapStatusManager
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil
    +import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
    +import org.apache.carbondata.core.util.CarbonUtil
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.events.{OperationContext, 
OperationListenerBus}
    +import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
    +import 
org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil
    +
    +/**
    + * support `alter table tableName add segment location 'path'` command.
    + * It will create a segment and map the path of datafile to segment's 
storage
    + */
    +case class CarbonAddSegmentCommand(
    +    dbNameOp: Option[String],
    +    tableName: String,
    +    filePathFromUser: String,
    +    var operationContext: OperationContext = new OperationContext) extends 
AtomicRunnableCommand {
    +  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
    +  var carbonTable: CarbonTable = _
    +
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession)
    +    carbonTable = {
    +      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +        .lookupRelation(Option(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation]
    +      if (relation == null) {
    +        LOGGER.error(s"Add segment failed due to table $dbName.$tableName 
not found")
    +        throw new NoSuchTableException(dbName, tableName)
    +      }
    +      relation.carbonTable
    +    }
    +
    +    if (carbonTable.isHivePartitionTable) {
    +      LOGGER.error("Ignore hive partition table for now")
    +    }
    +
    +    operationContext.setProperty("isOverwrite", false)
    +    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
    +      val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false)
    +      OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, 
operationContext)
    +    }
    +    Seq.empty
    +  }
    +
    +  // will just mapping external files to segment metadata
    +  override def processData(sparkSession: SparkSession): Seq[Row] = {
    --- End diff --
    
    In my opinion, creating the segment and updating the tablestatus both 
belong to `processData`. And in other command such as LoadData, these operation 
are in `processData` too.


---

Reply via email to