vinothchandar commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619592740



##########
File path: pom.xml
##########
@@ -112,6 +112,7 @@
     <spark3.version>3.0.0</spark3.version>
     <spark2bundle.version></spark2bundle.version>
     <spark3bundle.version>3</spark3bundle.version>
+    <hudi.spark.moudle>hudi-spark2</hudi.spark.moudle>

Review comment:
       hudi.spark.module?  is this change really essential for this PR or can 
it go in a separate one?

##########
File path: packaging/hudi-integ-test-bundle/pom.xml
##########
@@ -73,8 +73,7 @@
                   <include>org.apache.hudi:hudi-spark-common</include>
                   
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
                   
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
-                  
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>
-                  <include>org.apache.hudi:hudi-spark3_2.12</include>
+                  
<include>org.apache.hudi:${hudi.spark.moudle}_${scala.binary.version}</include>

Review comment:
       So you want to move towards an approach of picking the actual module to 
bundle? I think we have too many properties for spark 2 vs 3 already. We may 
need to think about re-simplifying everything actually, if this change is 
indeed needs for this PR

##########
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java
##########
@@ -31,6 +32,11 @@
 
   @Override
   public List<String> extractPartitionValuesInPath(String partitionPath) {
+    // If the partitionPath is empty string( which means none-partition 
table), the partition values

Review comment:
       again something that can go in its own PR?

##########
File path: 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/mergeInto.scala
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
+import org.apache.spark.sql.types.DataType
+
+// This code is just copy from v2Commands.scala in spark 3.0
+
+/**
+  * The logical plan of the MERGE INTO command that works for v2 tables.
+  */
+case class MergeIntoTable(

Review comment:
       name the file as `MergeInto.scala`? 

##########
File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4
##########
@@ -0,0 +1,1099 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This file is an adaptation of Presto's 
presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar.

Review comment:
       can we add this to NOTICE. there are examples there.  whats the license 
of this file?

##########
File path: hudi-spark-datasource/hudi-spark2/pom.xml
##########
@@ -29,6 +29,8 @@
 
   <properties>
     <main.basedir>${project.parent.parent.basedir}</main.basedir>
+    <scala.binary.version>2.11</scala.binary.version>

Review comment:
       can you please explain the reason for adding this?

##########
File path: 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdateTable.scala
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+// This code is just copy from v2Commands.scala in spark 3.0
+case class UpdateTable(

Review comment:
       my understanding is that once/if we deprecate spark2, these are no 
longer needed?

##########
File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4
##########
@@ -0,0 +1,1099 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This file is an adaptation of Presto's 
presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar.

Review comment:
       What are our own additions on top? can you please leave comments on the 
lines you had changed over the presto file?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+    val tableName = generateTableName
+    // Create a managed table
+    spark.sql(
+      s"""
+         | create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         | ) using hudi
+         | options (
+         |   primaryKey = 'id',
+         |   versionColumn = 'ts'

Review comment:
       this is the precombine field right?
   

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.payload
+
+import java.util.{Base64, Properties}
+import java.util.concurrent.Callable
+
+import scala.collection.JavaConverters._
+import com.google.common.cache.CacheBuilder
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.HoodieWriteHandle
+import org.apache.hudi.sql.IExpressionEvaluator
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.hudi.SerDeUtils
+import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * A HoodieRecordPayload for MergeIntoHoodieTableCommand.
+ * It will execute the condition and assignments expression in the
+ * match and not-match actions and compute the final record to write.
+ *
+ * If there is no condition match the record, ExpressionPayload will return
+ * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this 
record.
+ */
+class ExpressionPayload(record: GenericRecord,
+                        orderingVal: Comparable[_])
+  extends DefaultHoodieRecordPayload(record, orderingVal) {
+
+  def this(recordOpt: HOption[GenericRecord]) {
+    this(recordOpt.orElse(null), 0)
+  }
+
+  /**
+   * The schema of this table.
+   */
+  private var writeSchema: Schema = _
+
+  override def combineAndGetUpdateValue(currentValue: IndexedRecord,
+                                        schema: Schema): 
HOption[IndexedRecord] = {
+    throw new IllegalStateException(s"Should not call this method for 
${getClass.getCanonicalName}")
+  }
+
+  override def getInsertValue(schema: Schema): HOption[IndexedRecord] = {
+    throw new IllegalStateException(s"Should not call this method for 
${getClass.getCanonicalName}")
+  }
+
+  override def combineAndGetUpdateValue(targetRecord: IndexedRecord,
+                                        schema: Schema, properties: 
Properties): HOption[IndexedRecord] = {
+    val sourceRecord = bytesToAvro(recordBytes, schema)
+    val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord, 
targetRecord))
+
+    // Process update
+    val updateConditionAndAssignmentsText =
+      
properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS)
+    assert(updateConditionAndAssignmentsText != null,
+      s"${ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS} have not 
set")
+
+    var resultRecordOpt: HOption[IndexedRecord] = null
+
+    // Get the Evaluator for each condition and update assignments.
+    val updateConditionAndAssignments = 
getEvaluator(updateConditionAndAssignmentsText.toString)
+    for ((conditionEvaluator, assignmentEvaluator) <- 
updateConditionAndAssignments
+         if resultRecordOpt == null) {
+      val conditionVal = evaluate(conditionEvaluator, 
joinSqlRecord).head.asInstanceOf[Boolean]
+      // If the update condition matched  then execute assignment expression
+      // to compute final record to update. We will return the first matched 
record.
+      if (conditionVal) {
+        val results = evaluate(assignmentEvaluator, joinSqlRecord)
+        initWriteSchemaIfNeed(properties)
+        val resultRecord = convertToRecord(results, writeSchema)
+
+        if (needUpdatePersistedRecord(targetRecord, resultRecord, properties)) 
{
+          resultRecordOpt = HOption.of(resultRecord)
+        } else {
+          // if the PreCombine field value of targetRecord is greate
+          // than the new incoming record, just keep the old record value.
+          resultRecordOpt = HOption.of(targetRecord)
+        }
+      }
+    }
+    if (resultRecordOpt == null) {
+      // Process delete
+      val deleteConditionText = 
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
+      if (deleteConditionText != null) {
+        val deleteCondition = 
getEvaluator(deleteConditionText.toString).head._1
+        val deleteConditionVal = evaluate(deleteCondition, 
joinSqlRecord).head.asInstanceOf[Boolean]
+        if (deleteConditionVal) {
+          resultRecordOpt = HOption.empty()
+        }
+      }
+    }
+    if (resultRecordOpt == null) {
+      // If there is no condition matched, just filter this record.
+      // here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it.
+      HOption.of(HoodieWriteHandle.IGNORE_RECORD)

Review comment:
       I understand how you are using this now. Let me think about a better way 
of extending the RecordPayload APIs, if possible 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+    val tableName = generateTableName
+    // Create a managed table
+    spark.sql(
+      s"""
+         | create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         | ) using hudi
+         | options (
+         |   primaryKey = 'id',
+         |   versionColumn = 'ts'

Review comment:
       can we stick close to the parameter names used in the datasource write? 
We need not use the entire property name, but can we use same terms like 
`precombineColumn=ts`  instead?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+    val tableName = generateTableName
+    // Create a managed table
+    spark.sql(
+      s"""
+         | create table $tableName (

Review comment:
       what catalog is the table definition stored in?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to