This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d5444ff [HUDI-3018] Adding validation to dataframe scheme to ensure
reserved field does not have diff data type (#4852)
d5444ff is described below
commit d5444ff7ff832a2e14b5c78449713fdf82bcaec4
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 27 11:59:23 2022 -0500
[HUDI-3018] Adding validation to dataframe scheme to ensure reserved field
does not have diff data type (#4852)
---
.../org/apache/hudi/AvroConversionUtils.scala | 1 -
.../org/apache/hudi/common/model/HoodieRecord.java | 1 +
.../model/OverwriteWithLatestAvroPayload.java | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++++++++-----
.../apache/hudi/functional/TestCOWDataSource.scala | 26 +++++++++++++++++-----
5 files changed, 33 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index a006eeb..62bcbf6 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,7 +17,6 @@
*/
package org.apache.hudi
-
import org.apache.avro.Schema.Type
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder,
IndexedRecord}
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index f90448e..ac30766 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -40,6 +40,7 @@ public abstract class HoodieRecord<T> implements Serializable
{
public static final String PARTITION_PATH_METADATA_FIELD =
"_hoodie_partition_path";
public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
+ public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
public static final List<String> HOODIE_META_COLUMNS =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD,
COMMIT_SEQNO_METADATA_FIELD,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 4be2e3e..7b7bd6c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -85,7 +85,7 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
* @returns {@code true} if record represents a delete record. {@code false}
otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord) {
- final String isDeleteKey = "_hoodie_is_deleted";
+ final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED;
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
// does not exist in the schema.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b7f04c5..89304d3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,11 +19,10 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
-
+import org.apache.avro.reflect.AvroSchema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
-
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
@@ -45,9 +44,7 @@ import
org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKey
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
-
import org.apache.log4j.LogManager
-
import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -58,7 +55,6 @@ import org.apache.spark.sql._
import org.apache.spark.SparkContext
import java.util.Properties
-
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -242,6 +238,7 @@ object HoodieSparkSqlWriter {
if (reconcileSchema) {
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
}
+ validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
@@ -432,6 +429,14 @@ object HoodieSparkSqlWriter {
}
}
+ def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = {
+ if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null &&
+
AvroConversionUtils.resolveAvroTypeNullability(schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema())._2.getType
!= Schema.Type.BOOLEAN) {
+ throw new HoodieException(HoodieRecord.HOODIE_IS_DELETED + " has to be
BOOLEAN type. Passed in dataframe's schema has type "
+ + schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema().getType)
+ }
+ }
+
def bulkInsertAsRow(sqlContext: SQLContext,
parameters: Map[String, String],
df: DataFrame,
@@ -454,6 +459,7 @@ object HoodieSparkSqlWriter {
if (dropPartitionColumns) {
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
}
+ validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 58b36f8..96d50f6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,23 +18,21 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.FileSystem
-
import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieUpsertException
+import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers}
-
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
-
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue, fail}
@@ -44,7 +42,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.sql.{Date, Timestamp}
-
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -98,6 +95,23 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testHoodieIsDeletedNonBooleanField() {
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ val df = inputDF.withColumn(HoodieRecord.HOODIE_IS_DELETED, lit("abc"))
+
+ assertThrows(classOf[HoodieException], new Executable {
+ override def execute(): Unit = {
+ df.write.format("hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ }
+ }, "Should have failed since _hoodie_is_deleted is not a BOOLEAN data
type")
+ }
+
/**
* This tests the case that query by with a specified partition condition on
hudi table which is
* different between the value of the partition field and the actual
partition path,