vinothchandar commented on a change in pull request #2645: URL: https://github.com/apache/hudi/pull/2645#discussion_r619602717
########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import scala.collection.JavaConverters._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} + +class TestCreateTable extends TestHoodieSqlBase { + + test("Test Create Managed Hoodie Table") { + val tableName = generateTableName + // Create a managed table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | options ( + | primaryKey = 'id', + | versionColumn = 'ts' Review comment: this is the precombine field right? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.payload + +import java.util.{Base64, Properties} +import java.util.concurrent.Callable + +import scala.collection.JavaConverters._ +import com.google.common.cache.CacheBuilder +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.io.HoodieWriteHandle +import org.apache.hudi.sql.IExpressionEvaluator +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.hudi.SerDeUtils +import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.mutable.ArrayBuffer + +/** + * A HoodieRecordPayload for MergeIntoHoodieTableCommand. + * It will execute the condition and assignments expression in the + * match and not-match actions and compute the final record to write. + * + * If there is no condition match the record, ExpressionPayload will return + * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this record. + */ +class ExpressionPayload(record: GenericRecord, + orderingVal: Comparable[_]) + extends DefaultHoodieRecordPayload(record, orderingVal) { + + def this(recordOpt: HOption[GenericRecord]) { + this(recordOpt.orElse(null), 0) + } + + /** + * The schema of this table. + */ + private var writeSchema: Schema = _ + + override def combineAndGetUpdateValue(currentValue: IndexedRecord, + schema: Schema): HOption[IndexedRecord] = { + throw new IllegalStateException(s"Should not call this method for ${getClass.getCanonicalName}") + } + + override def getInsertValue(schema: Schema): HOption[IndexedRecord] = { + throw new IllegalStateException(s"Should not call this method for ${getClass.getCanonicalName}") + } + + override def combineAndGetUpdateValue(targetRecord: IndexedRecord, + schema: Schema, properties: Properties): HOption[IndexedRecord] = { + val sourceRecord = bytesToAvro(recordBytes, schema) + val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord, targetRecord)) + + // Process update + val updateConditionAndAssignmentsText = + properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS) + assert(updateConditionAndAssignmentsText != null, + s"${ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS} have not set") + + var resultRecordOpt: HOption[IndexedRecord] = null + + // Get the Evaluator for each condition and update assignments. + val updateConditionAndAssignments = getEvaluator(updateConditionAndAssignmentsText.toString) + for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments + if resultRecordOpt == null) { + val conditionVal = evaluate(conditionEvaluator, joinSqlRecord).head.asInstanceOf[Boolean] + // If the update condition matched then execute assignment expression + // to compute final record to update. We will return the first matched record. + if (conditionVal) { + val results = evaluate(assignmentEvaluator, joinSqlRecord) + initWriteSchemaIfNeed(properties) + val resultRecord = convertToRecord(results, writeSchema) + + if (needUpdatePersistedRecord(targetRecord, resultRecord, properties)) { + resultRecordOpt = HOption.of(resultRecord) + } else { + // if the PreCombine field value of targetRecord is greate + // than the new incoming record, just keep the old record value. + resultRecordOpt = HOption.of(targetRecord) + } + } + } + if (resultRecordOpt == null) { + // Process delete + val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) + if (deleteConditionText != null) { + val deleteCondition = getEvaluator(deleteConditionText.toString).head._1 + val deleteConditionVal = evaluate(deleteCondition, joinSqlRecord).head.asInstanceOf[Boolean] + if (deleteConditionVal) { + resultRecordOpt = HOption.empty() + } + } + } + if (resultRecordOpt == null) { + // If there is no condition matched, just filter this record. + // here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it. + HOption.of(HoodieWriteHandle.IGNORE_RECORD) Review comment: I understand how you are using this now. Let me think about a better way of extending the RecordPayload APIs, if possible ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import scala.collection.JavaConverters._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} + +class TestCreateTable extends TestHoodieSqlBase { + + test("Test Create Managed Hoodie Table") { + val tableName = generateTableName + // Create a managed table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | options ( + | primaryKey = 'id', + | versionColumn = 'ts' Review comment: can we stick close to the parameter names used in the datasource write? We need not use the entire property name, but can we use same terms like `precombineColumn=ts` instead? ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import scala.collection.JavaConverters._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} + +class TestCreateTable extends TestHoodieSqlBase { + + test("Test Create Managed Hoodie Table") { + val tableName = generateTableName + // Create a managed table + spark.sql( + s""" + | create table $tableName ( Review comment: what catalog is the table definition stored in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
