Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1616#discussion_r155471634
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---
    @@ -151,58 +136,76 @@ object CarbonSession {
               return session
             }
     
    -        // No active nor global default session. Create a new one.
    -        val sparkContext = userSuppliedContext.getOrElse {
    -          // set app name if not given
    -          val randomAppName = java.util.UUID.randomUUID().toString
    -          val sparkConf = new SparkConf()
    -          options.foreach { case (k, v) => sparkConf.set(k, v) }
    -          if (!sparkConf.contains("spark.app.name")) {
    -            sparkConf.setAppName(randomAppName)
    +        // Global synchronization so we will only set the default session 
once.
    +        SparkSession.synchronized {
    +          // If the current thread does not have an active session, get it 
from the global session.
    +          session = SparkSession.getDefaultSession match {
    +            case Some(sparkSession: CarbonSession) =>
    +              if ((sparkSession ne null) && 
!sparkSession.sparkContext.isStopped) {
    +                options
    +                  .foreach { case (k, v) => 
sparkSession.sessionState.conf.setConfString(k, v) }
    +                sparkSession
    +              } else {
    +                null
    +              }
    +            case _ => null
               }
    -          val sc = SparkContext.getOrCreate(sparkConf)
    -          // maybe this is an existing SparkContext, update its SparkConf 
which maybe used
    -          // by SparkSession
    -          options.foreach { case (k, v) => sc.conf.set(k, v) }
    -          if (!sc.conf.contains("spark.app.name")) {
    -            sc.conf.setAppName(randomAppName)
    +          if (session ne null) {
    +            return session
               }
    -          sc
    -        }
     
    -        session = new CarbonSession(sparkContext)
    -        val carbonProperties = CarbonProperties.getInstance()
    -        if (storePath != null) {
    -          
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
    -          // In case if it is in carbon.properties for backward compatible
    -        } else if 
(carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
    -          
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
    -            session.sessionState.conf.warehousePath)
    -        }
    -        options.foreach { case (k, v) => 
session.sessionState.conf.setConfString(k, v) }
    -        SparkSession.setDefaultSession(session)
    -        try {
    -          CommonUtil.cleanInProgressSegments(
    -            
carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), 
sparkContext)
    -        } catch {
    -          case e: Throwable =>
    -            // catch all exceptions to avoid CarbonSession initialization 
failure
    -          LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    -            .error(e, "Failed to clean in progress segments")
    -        }
    -        // Register a successfully instantiated context to the singleton. 
This should be at the
    -        // end of the class definition so that the singleton is updated 
only if there is no
    -        // exception in the construction of the instance.
    -        sparkContext.addSparkListener(new SparkListener {
    -          override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
    -            SparkSession.setDefaultSession(null)
    -            SparkSession.sqlListener.set(null)
    +          // No active nor global default session. Create a new one.
    +          val sparkContext = userSuppliedContext.getOrElse {
    +            // set app name if not given
    +            val randomAppName = java.util.UUID.randomUUID().toString
    +            val sparkConf = new SparkConf()
    +            options.foreach { case (k, v) => sparkConf.set(k, v) }
    +            if (!sparkConf.contains("spark.app.name")) {
    +              sparkConf.setAppName(randomAppName)
    +            }
    +            val sc = SparkContext.getOrCreate(sparkConf)
    +            // maybe this is an existing SparkContext, update its 
SparkConf which maybe used
    +            // by SparkSession
    +            options.foreach { case (k, v) => sc.conf.set(k, v) }
    +            if (!sc.conf.contains("spark.app.name")) {
    +              sc.conf.setAppName(randomAppName)
    +            }
    +            sc
               }
    -        })
    -        session.streams.addListener(new 
CarbonStreamingQueryListener(session))
    -      }
     
    -      session
    +          session = new CarbonSession(sparkContext)
    --- End diff --
    
    Why these changes


---

Reply via email to