Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1616#discussion_r155471634
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---
@@ -151,58 +136,76 @@ object CarbonSession {
return session
}
- // No active nor global default session. Create a new one.
- val sparkContext = userSuppliedContext.getOrElse {
- // set app name if not given
- val randomAppName = java.util.UUID.randomUUID().toString
- val sparkConf = new SparkConf()
- options.foreach { case (k, v) => sparkConf.set(k, v) }
- if (!sparkConf.contains("spark.app.name")) {
- sparkConf.setAppName(randomAppName)
+ // Global synchronization so we will only set the default session
once.
+ SparkSession.synchronized {
+ // If the current thread does not have an active session, get it
from the global session.
+ session = SparkSession.getDefaultSession match {
+ case Some(sparkSession: CarbonSession) =>
+ if ((sparkSession ne null) &&
!sparkSession.sparkContext.isStopped) {
+ options
+ .foreach { case (k, v) =>
sparkSession.sessionState.conf.setConfString(k, v) }
+ sparkSession
+ } else {
+ null
+ }
+ case _ => null
}
- val sc = SparkContext.getOrCreate(sparkConf)
- // maybe this is an existing SparkContext, update its SparkConf
which maybe used
- // by SparkSession
- options.foreach { case (k, v) => sc.conf.set(k, v) }
- if (!sc.conf.contains("spark.app.name")) {
- sc.conf.setAppName(randomAppName)
+ if (session ne null) {
+ return session
}
- sc
- }
- session = new CarbonSession(sparkContext)
- val carbonProperties = CarbonProperties.getInstance()
- if (storePath != null) {
-
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
- // In case if it is in carbon.properties for backward compatible
- } else if
(carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
-
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
- session.sessionState.conf.warehousePath)
- }
- options.foreach { case (k, v) =>
session.sessionState.conf.setConfString(k, v) }
- SparkSession.setDefaultSession(session)
- try {
- CommonUtil.cleanInProgressSegments(
-
carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION),
sparkContext)
- } catch {
- case e: Throwable =>
- // catch all exceptions to avoid CarbonSession initialization
failure
- LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- .error(e, "Failed to clean in progress segments")
- }
- // Register a successfully instantiated context to the singleton.
This should be at the
- // end of the class definition so that the singleton is updated
only if there is no
- // exception in the construction of the instance.
- sparkContext.addSparkListener(new SparkListener {
- override def onApplicationEnd(applicationEnd:
SparkListenerApplicationEnd): Unit = {
- SparkSession.setDefaultSession(null)
- SparkSession.sqlListener.set(null)
+ // No active nor global default session. Create a new one.
+ val sparkContext = userSuppliedContext.getOrElse {
+ // set app name if not given
+ val randomAppName = java.util.UUID.randomUUID().toString
+ val sparkConf = new SparkConf()
+ options.foreach { case (k, v) => sparkConf.set(k, v) }
+ if (!sparkConf.contains("spark.app.name")) {
+ sparkConf.setAppName(randomAppName)
+ }
+ val sc = SparkContext.getOrCreate(sparkConf)
+ // maybe this is an existing SparkContext, update its
SparkConf which maybe used
+ // by SparkSession
+ options.foreach { case (k, v) => sc.conf.set(k, v) }
+ if (!sc.conf.contains("spark.app.name")) {
+ sc.conf.setAppName(randomAppName)
+ }
+ sc
}
- })
- session.streams.addListener(new
CarbonStreamingQueryListener(session))
- }
- session
+ session = new CarbonSession(sparkContext)
--- End diff --
Why these changes
---