Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153116624
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala
---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.reflect.runtime._
+import scala.reflect.runtime.universe._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.AstBuilder
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.util.Utils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * Reflection APIs
+ */
+
+object CarbonClassReflectionUtils {
+
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ private val rm = universe.runtimeMirror(getClass.getClassLoader)
+
+ /**
+ * Returns the field val from a object through reflection.
+ * @param name - name of the field being retrieved.
+ * @param obj - Object from which the field has to be retrieved.
+ * @tparam T
+ * @return
+ */
+ def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
+ val im = rm.reflect(obj)
+
+ im.symbol.typeSignature.members.find(
+ _.name.toString.equals(name)).map(
+ l => im.reflectField(l.asTerm).get
+ // .asInstanceOf[LogicalPlan]
+ ).getOrElse(null)
+ }
+
+ def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T):
Boolean = {
+ val hasField : Boolean = if
(typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
+ true
+ } else {
+ false
+ }
+ hasField
+ }
+
+ def getUnresolvedRelation(tableIdentifier: TableIdentifier,
+ tableAlias: Option[String] = None): UnresolvedRelation = {
+
+ val clazz =
Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
+ try {
+ // For 2.1
+ clazz.getDeclaredField("alias")
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val unresolvedrelation = ctor
+ .newInstance(tableIdentifier,
+ Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
+ unresolvedrelation
+ } catch {
+ case ce: NoSuchFieldException =>
+ // For Spark-2.2
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val unresolvedrelation = ctor
+ .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation]
+ unresolvedrelation
+ }
+ }
+
+ def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
+ relation: LogicalPlan,
+ view: Option[TableIdentifier]): SubqueryAlias = {
+ if (sparkSession.version.contains("2.1")) {
+ //
SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
+ // Project(projList, relation), Option(table.tableIdentifier))
+ val clazz =
Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val subqueryAlias = ctor
+ .newInstance(alias.getOrElse(""), relation,
Option(view)).asInstanceOf[SubqueryAlias]
+ subqueryAlias
+ } else if (sparkSession.version.contains("2.2")) {
+ //
SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
+ // Project(projList, relation))
+ val clazz =
Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val subqueryAlias = ctor
+ .newInstance(alias.getOrElse(""),
relation).asInstanceOf[SubqueryAlias]
+ subqueryAlias
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version")
+ }
+ }
+
+ def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T):
Boolean = {
+ var overwriteboolean: Boolean = false
+ val im = rm.reflect(obj)
+ for (l <-
im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
+ overwriteboolean =
im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
+ }
+ overwriteboolean
+ }
+
+ def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj:
T): Boolean = {
+ var overwriteboolean: Boolean = false
+ val im = rm.reflect(obj)
+ for (m <- typeOf[T].members.filter(!_.isMethod)) {
+ if (m.toString.contains("overwrite")) {
+ val typ = m.typeSignature
+ if (typ.toString.contains("Boolean")) {
+ // Spark2.2
+ overwriteboolean =
im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
+ } else {
+ overwriteboolean = getOverWrite("enabled",
im.reflectField(m.asTerm).get)
+ }
+ }
+ }
+ overwriteboolean
+ }
+
+ def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String,
obj: T): Any = {
+ val im = rm.reflect(obj)
+ val sym = im.symbol.typeSignature.member(TermName(name))
+ val tableMeta = im.reflectMethod(sym.asMethod).apply()
+ tableMeta
+ }
+
+ def getAstBuilder(conf: SQLConf,
+ sqlParser: CarbonSpark2SqlParser,
+ sparkSession: SparkSession): AstBuilder = {
+ if (sparkSession.version.contains("2.1")) {
+ val clazz =
Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val astBuilder = ctor.newInstance(conf,
sqlParser).asInstanceOf[AstBuilder]
+ astBuilder
+ } else if (sparkSession.version.contains("2.2")) {
+ val clazz =
Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ val astBuilder = ctor.newInstance(conf,
sqlParser).asInstanceOf[AstBuilder]
+ astBuilder
+ } else {
+ throw new UnsupportedOperationException("Spark version not
supported")
+ }
+ }
+
+ def getSessionState(sparkContext: SparkContext, carbonSession:
CarbonSession): SessionState = {
+ if (sparkContext.version.contains("2.1")) {
--- End diff --
i think it need to use startswith("2.1"). if use equals, when i use spark
2.2.1, it will be failed
---