vinothchandar commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619592740
##########
File path: pom.xml
##########
@@ -112,6 +112,7 @@
<spark3.version>3.0.0</spark3.version>
<spark2bundle.version></spark2bundle.version>
<spark3bundle.version>3</spark3bundle.version>
+ <hudi.spark.moudle>hudi-spark2</hudi.spark.moudle>
Review comment:
hudi.spark.module? is this change really essential for this PR or can
it go in a separate one?
##########
File path: packaging/hudi-integ-test-bundle/pom.xml
##########
@@ -73,8 +73,7 @@
<include>org.apache.hudi:hudi-spark-common</include>
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
-
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>
- <include>org.apache.hudi:hudi-spark3_2.12</include>
+
<include>org.apache.hudi:${hudi.spark.moudle}_${scala.binary.version}</include>
Review comment:
So you want to move towards an approach of picking the actual module to
bundle? I think we have too many properties for spark 2 vs 3 already. We may
need to think about re-simplifying everything actually, if this change is
indeed needs for this PR
##########
File path:
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java
##########
@@ -31,6 +32,11 @@
@Override
public List<String> extractPartitionValuesInPath(String partitionPath) {
+ // If the partitionPath is empty string( which means none-partition
table), the partition values
Review comment:
again something that can go in its own PR?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/mergeInto.scala
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
+import org.apache.spark.sql.types.DataType
+
+// This code is just copy from v2Commands.scala in spark 3.0
+
+/**
+ * The logical plan of the MERGE INTO command that works for v2 tables.
+ */
+case class MergeIntoTable(
Review comment:
name the file as `MergeInto.scala`?
##########
File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4
##########
@@ -0,0 +1,1099 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This file is an adaptation of Presto's
presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar.
Review comment:
can we add this to NOTICE. there are examples there. whats the license
of this file?
##########
File path: hudi-spark-datasource/hudi-spark2/pom.xml
##########
@@ -29,6 +29,8 @@
<properties>
<main.basedir>${project.parent.parent.basedir}</main.basedir>
+ <scala.binary.version>2.11</scala.binary.version>
Review comment:
can you please explain the reason for adding this?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdateTable.scala
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+// This code is just copy from v2Commands.scala in spark 3.0
+case class UpdateTable(
Review comment:
my understanding is that once/if we deprecate spark2, these are no
longer needed?
##########
File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4
##########
@@ -0,0 +1,1099 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This file is an adaptation of Presto's
presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar.
Review comment:
What are our own additions on top? can you please leave comments on the
lines you had changed over the presto file?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+ test("Test Create Managed Hoodie Table") {
+ val tableName = generateTableName
+ // Create a managed table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | options (
+ | primaryKey = 'id',
+ | versionColumn = 'ts'
Review comment:
this is the precombine field right?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.payload
+
+import java.util.{Base64, Properties}
+import java.util.concurrent.Callable
+
+import scala.collection.JavaConverters._
+import com.google.common.cache.CacheBuilder
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.HoodieWriteHandle
+import org.apache.hudi.sql.IExpressionEvaluator
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.hudi.SerDeUtils
+import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * A HoodieRecordPayload for MergeIntoHoodieTableCommand.
+ * It will execute the condition and assignments expression in the
+ * match and not-match actions and compute the final record to write.
+ *
+ * If there is no condition match the record, ExpressionPayload will return
+ * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this
record.
+ */
+class ExpressionPayload(record: GenericRecord,
+ orderingVal: Comparable[_])
+ extends DefaultHoodieRecordPayload(record, orderingVal) {
+
+ def this(recordOpt: HOption[GenericRecord]) {
+ this(recordOpt.orElse(null), 0)
+ }
+
+ /**
+ * The schema of this table.
+ */
+ private var writeSchema: Schema = _
+
+ override def combineAndGetUpdateValue(currentValue: IndexedRecord,
+ schema: Schema):
HOption[IndexedRecord] = {
+ throw new IllegalStateException(s"Should not call this method for
${getClass.getCanonicalName}")
+ }
+
+ override def getInsertValue(schema: Schema): HOption[IndexedRecord] = {
+ throw new IllegalStateException(s"Should not call this method for
${getClass.getCanonicalName}")
+ }
+
+ override def combineAndGetUpdateValue(targetRecord: IndexedRecord,
+ schema: Schema, properties:
Properties): HOption[IndexedRecord] = {
+ val sourceRecord = bytesToAvro(recordBytes, schema)
+ val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord,
targetRecord))
+
+ // Process update
+ val updateConditionAndAssignmentsText =
+
properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS)
+ assert(updateConditionAndAssignmentsText != null,
+ s"${ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS} have not
set")
+
+ var resultRecordOpt: HOption[IndexedRecord] = null
+
+ // Get the Evaluator for each condition and update assignments.
+ val updateConditionAndAssignments =
getEvaluator(updateConditionAndAssignmentsText.toString)
+ for ((conditionEvaluator, assignmentEvaluator) <-
updateConditionAndAssignments
+ if resultRecordOpt == null) {
+ val conditionVal = evaluate(conditionEvaluator,
joinSqlRecord).head.asInstanceOf[Boolean]
+ // If the update condition matched then execute assignment expression
+ // to compute final record to update. We will return the first matched
record.
+ if (conditionVal) {
+ val results = evaluate(assignmentEvaluator, joinSqlRecord)
+ initWriteSchemaIfNeed(properties)
+ val resultRecord = convertToRecord(results, writeSchema)
+
+ if (needUpdatePersistedRecord(targetRecord, resultRecord, properties))
{
+ resultRecordOpt = HOption.of(resultRecord)
+ } else {
+ // if the PreCombine field value of targetRecord is greate
+ // than the new incoming record, just keep the old record value.
+ resultRecordOpt = HOption.of(targetRecord)
+ }
+ }
+ }
+ if (resultRecordOpt == null) {
+ // Process delete
+ val deleteConditionText =
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
+ if (deleteConditionText != null) {
+ val deleteCondition =
getEvaluator(deleteConditionText.toString).head._1
+ val deleteConditionVal = evaluate(deleteCondition,
joinSqlRecord).head.asInstanceOf[Boolean]
+ if (deleteConditionVal) {
+ resultRecordOpt = HOption.empty()
+ }
+ }
+ }
+ if (resultRecordOpt == null) {
+ // If there is no condition matched, just filter this record.
+ // here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it.
+ HOption.of(HoodieWriteHandle.IGNORE_RECORD)
Review comment:
I understand how you are using this now. Let me think about a better way
of extending the RecordPayload APIs, if possible
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+ test("Test Create Managed Hoodie Table") {
+ val tableName = generateTableName
+ // Create a managed table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | options (
+ | primaryKey = 'id',
+ | versionColumn = 'ts'
Review comment:
can we stick close to the parameter names used in the datasource write?
We need not use the entire property name, but can we use same terms like
`precombineColumn=ts` instead?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+ test("Test Create Managed Hoodie Table") {
+ val tableName = generateTableName
+ // Create a managed table
+ spark.sql(
+ s"""
+ | create table $tableName (
Review comment:
what catalog is the table definition stored in?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]