akashrn5 commented on a change in pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#discussion_r639690258



##########
File path: 
integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, 
CarbonToSparkAdapter}
+import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, SortOrder, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression => 
SparkExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.{DataSourceScanExec, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.carbondata.core.index.IndexFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS carbondata.
+ */
+case class CarbonDataSourceScan(
+    @transient relation: CarbonDatasourceHadoopRelation,
+    output: Seq[Attribute],
+    partitionFilters: Seq[SparkExpression],
+    dataFilters: Seq[SparkExpression],
+    @transient readComittedScope: ReadCommittedScope,
+    @transient pushedDownProjection: CarbonProjection,
+    @transient pushedDownFilters: Seq[Expression],
+    directScanSupport: Boolean,
+    @transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
+    tableIdentifier: Option[TableIdentifier] = None,
+    segmentIds: Option[String] = None)
+  extends DataSourceScanExec {
+
+  override lazy val supportsColumnar: Boolean = CarbonPlanHelper
+    .supportBatchedDataSource(sqlContext, output, extraRDD)
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches 
=>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          val res = batches.hasNext
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+
+  lazy val needsUnsafeRowConversion: Boolean = { true }
+
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+    val info: BucketingInfo = relation.carbonTable.getBucketingInfo
+    if (info != null) {
+      val cols = info.getListOfColumns.asScala
+      val numBuckets = info.getNumOfRanges
+      val bucketColumns = cols.flatMap { n =>
+        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+        attrRef match {
+          case Some(attr: AttributeReference) =>
+            Some(AttributeReference(attr.name,
+              
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(n.getDataType),
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier))
+          case _ => None
+        }
+      }
+      if (bucketColumns.size == cols.size) {
+        // use HashPartitioning will not shuffle
+        (HashPartitioning(bucketColumns, numBuckets), Nil)
+      } else {
+        (UnknownPartitioning(0), Nil)
+      }
+    } else {
+      (UnknownPartitioning(0), Nil)
+    }
+  }
+
+  override lazy val metadata: Map[String, String] = {
+    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+    val metadata =
+      Map(
+        "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
+        "Batched" -> supportsColumnar.toString,
+        "DirectScan" -> (supportsColumnar && directScanSupport).toString,
+        "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
+    if (relation.carbonTable.isHivePartitionTable) {
+      metadata + ("PartitionFilters" -> seqToString(partitionFilters)) +
+        ("PartitionCount" -> selectedPartitions.size.toString)
+    } else {
+      metadata
+    }
+  }
+
+  @transient private lazy val indexFilter: IndexFilter = {
+    val filter = pushedDownFilters.reduceOption(new AndExpression(_, _))
+      .map(new IndexFilter(relation.carbonTable, _, true)).orNull
+    if (filter != null && pushedDownFilters.length == 1) {
+      // push down the limit if only one filter
+      filter.setLimit(relation.limit)
+    }
+    filter
+  }
+
+  @transient private lazy val selectedPartitions: Seq[PartitionSpec] = {
+    CarbonFilters
+      .getPartitions(partitionFilters, relation.sparkSession, 
relation.carbonTable)
+      .orNull
+  }
+
+  private lazy val inputRDD: RDD[InternalRow] = {
+    val carbonRdd = new CarbonScanRDD[InternalRow](
+      relation.sparkSession,
+      pushedDownProjection,
+      indexFilter,
+      relation.identifier,
+      relation.carbonTable.getTableInfo.serialize(),
+      relation.carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      selectedPartitions,
+      segmentIds = segmentIds)
+    carbonRdd.setVectorReaderSupport(supportsColumnar)
+    carbonRdd.setDirectScanSupport(supportsColumnar && directScanSupport)
+    extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd)
+  }
+

Review comment:
       in this class also, common code we will move to common package, 
columnBatch we can keep for 3.1

##########
File path: 
integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.CarbonException
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * parser order: carbon parser => spark parser
+ */
+class CarbonExtensionSqlParser(

Review comment:
       can combine 2.3 and 2.4

##########
File path: 
integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.Callable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition, ExternalCatalogWithListener, FunctionResourceLoader, 
GlobalTempViewManager}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.execution.strategy.{CarbonSourceStrategy, 
DDLStrategy, DMLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(

Review comment:
       can combine this class of 2.3 and 2.4 and move to common

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -240,6 +235,21 @@ object DMLStrategy extends SparkStrategy {
     
condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinRangeListUDF]
   }
 
+  object CarbonExtractEquiJoinKeys {
+    def unapply(plan: LogicalPlan): Option[(JoinType, Seq[Expression], 
Seq[Expression],
+      Option[Expression], LogicalPlan, LogicalPlan)] = {
+      plan match {
+        case join: Join =>
+          ExtractEquiJoinKeys.unapply(join) match {
+              // ignoring hints as carbon is not using them right now

Review comment:
       add a TODO here to support join with Hints

##########
File path: 
integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util
+
+import org.apache.spark.sql.CarbonToSparkAdapter
+
+import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.CarbonParserUtil
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser
+import 
org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, 
CreateTableContext, HiveChangeColumnContext}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
QualifiedColType}
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, 
AlterTableDataTypeChangeModel}
+import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableColRenameDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonExplainCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.{DecimalType, StructField}
+
+trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
+
+

Review comment:
       remove extra line

##########
File path: 
mv/plan/src/main/spark2.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
##########
@@ -47,4 +55,110 @@ object ExpressionHelper {
     reference.qualifier.head
   }
 
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+      plan: LogicalPlan, stats: Statistics,
+      aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes: AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = {
+    val Subquery(newPlan) = BirdcageOptimizer.execute(Subquery(s.plan))
+    newPlan
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): 
NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def attributeMap(rAliasMap: AttributeMap[Attribute]) : 
AttributeMap[Expression] = {
+    rAliasMap.asInstanceOf[AttributeMap[Expression]]
+  }
+
+  def seqOfRules : Seq[Rule[LogicalPlan]] = {
+    Seq(
+      // Operator push down
+      PushProjectionThroughUnion,
+      ReorderJoin,
+      EliminateOuterJoin,
+      PushPredicateThroughJoin,
+      PushDownPredicate,
+      ColumnPruning,
+      // Operator combine
+      CollapseRepartition,
+      CollapseProject,
+      CollapseWindow,
+      CombineFilters,
+      CombineLimits,
+      CombineUnions,
+      // Constant folding and strength reduction
+      NullPropagation,
+      FoldablePropagation,
+      ConstantFolding,
+      ReorderAssociativeOperator,
+      // No need to apply LikeSimplification rule while creating MV
+      // as modular plan asCompactSql will be set in schema
+      //        LikeSimplification,
+      BooleanSimplification,
+      SimplifyConditionals,
+      RemoveDispensableExpressions,
+      SimplifyBinaryComparison,
+      EliminateSorts,
+      SimplifyCasts,
+      SimplifyCaseConversionExpressions,
+      RewriteCorrelatedScalarSubquery,
+      EliminateSerialization,
+      RemoveRedundantAliases,
+      RemoveRedundantProject)
+  }
+}
+
+trait getVerboseString extends LeafNode {
+}
+
+trait groupByUnaryNode extends UnaryNode {
+}
+
+trait selectModularPlan extends ModularPlan {
+}
+
+trait unionModularPlan extends ModularPlan {
+}
+
+trait oneRowTableLeafNode extends LeafNode {
+}
+
+object MatchJoin {
+  def unapply(plan : LogicalPlan): Option[(LogicalPlan, LogicalPlan, JoinType, 
Option[Expression],
+    Option[Any])] = {
+    plan match {
+      case j@Join(left, right, joinType, condition) =>
+        val a = Some(left, right, joinType, condition, None)
+        a

Review comment:
       please refactor and move the common cod to common package fo Expression 
helper for all version

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
##########
@@ -65,7 +65,10 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
   protected val EXCLUDE = carbonKeyWord("EXCLUDE")
   protected val EXPLAIN = carbonKeyWord("EXPLAIN")
-  protected val EXTENDED = carbonKeyWord("EXTENDED")
+  protected val MODE = carbonKeyWord("EXTENDED") |
+                       carbonKeyWord("CODEGEN") |
+                       carbonKeyWord("COST") |
+                       carbonKeyWord("FORMATTED")

Review comment:
       please add test case with all MODE if not added

##########
File path: 
mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
##########
@@ -358,6 +359,12 @@ trait Printers {
           ""
         }
         qualifierPrefix + quoteIdentifier(child.name) + " AS " + 
quoteIdentifier(a.name)
+      case a@Alias(child: AggregateExpression, _) =>
+        child.sql + " AS " + quoteIdentifier(a.name)

Review comment:
       use String concat using super, s"..."

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
##########
@@ -109,7 +109,9 @@ public RowParserImpl(DataField[] output, 
CarbonDataLoadConfiguration configurati
       return new String[numberOfColumns];
     }
     // If number of columns are less in a row then create new array with same 
size of header.

Review comment:
       please update the comment according to new conditon

##########
File path: 
integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/SparkSqlAstBuilderWrapper.scala
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * use this wrapper to adapter multiple spark versions
+ */
+abstract class SparkSqlAstBuilderWrapper(conf: SQLConf) extends 
SparkSqlAstBuilder {

Review comment:
       can combine 2.3 and 2.4

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
##########
@@ -147,13 +152,15 @@ object BroadCastSIFilterPushJoin {
       inputCopy: Array[InternalRow],
       leftKeys: Seq[Expression],
       rightKeys: Seq[Expression],
-      buildSide: BuildSide,
+      buildSide: CarbonBuildSideType,
       isIndexTable: Boolean = false): Unit = {
 
+    val carbonBuildSide = CarbonBuildSide(buildSide)

Review comment:
       CarbonBuildSide can be put commonto 2.4 and 3.1 and refactor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to