This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 800639e361b [HUDI-6450] Fix null strings handling in
convertRowToJsonString (#9064)
800639e361b is described below
commit 800639e361b7d8b00031d56e07e6cb45e115eb28
Author: Tomasz Zarna <[email protected]>
AuthorDate: Wed Jul 5 12:18:37 2023 +0200
[HUDI-6450] Fix null strings handling in convertRowToJsonString (#9064)
* Fix by handling null strings in convertRowToJsonString
* Instantiate InternalRowToJsonStringConverter only once
* Depending on Jackson version/configuration null values are serialized or
not
---------
Co-authored-by: Tomasz Zarna <[email protected]>
---
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 26 ++------
.../cdc/InternalRowToJsonStringConverter.scala | 51 ++++++++++++++
.../cdc/TestInternalRowToJsonStringConverter.scala | 78 ++++++++++++++++++++++
3 files changed, 133 insertions(+), 22 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index b42e6f88007..839b02828d0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -18,9 +18,6 @@
package org.apache.hudi.cdc
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hadoop.fs.Path
@@ -47,7 +44,7 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Projection
import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{Partition, SerializableWritable, TaskContext}
@@ -157,14 +154,6 @@ class HoodieCDCRDD(
)
}
- private lazy val mapper: ObjectMapper = {
- val _mapper = new ObjectMapper
- _mapper.setSerializationInclusion(Include.NON_ABSENT)
- _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false)
- _mapper.registerModule(DefaultScalaModule)
- _mapper
- }
-
protected override val avroSchema: Schema = new
Schema.Parser().parse(originTableSchema.avroSchemaStr)
protected override val structTypeSchema: StructType =
originTableSchema.structTypeSchema
@@ -245,6 +234,8 @@ class HoodieCDCRDD(
*/
private var afterImageRecords: mutable.Map[String, InternalRow] =
mutable.Map.empty
+ private var internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema)
+
private def needLoadNextFile: Boolean = {
!recordIter.hasNext &&
!logRecordIter.hasNext &&
@@ -557,16 +548,7 @@ class HoodieCDCRDD(
* Convert InternalRow to json string.
*/
private def convertRowToJsonString(record: InternalRow): UTF8String = {
- val map = scala.collection.mutable.Map.empty[String, Any]
- originTableSchema.structTypeSchema.zipWithIndex.foreach {
- case (field, idx) =>
- if (field.dataType.isInstanceOf[StringType]) {
- map(field.name) = record.getString(idx)
- } else {
- map(field.name) = record.get(idx, field.dataType)
- }
- }
- convertToUTF8String(mapper.writeValueAsString(map))
+ internalRowToJsonStringConverter.convert(record)
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
new file mode 100644
index 00000000000..c0f52a7ca12
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.hudi.HoodieTableSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+class InternalRowToJsonStringConverter(originTableSchema: HoodieTableSchema) {
+
+ private lazy val mapper: ObjectMapper = {
+ val _mapper = new ObjectMapper
+ _mapper.setSerializationInclusion(Include.NON_ABSENT)
+ _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ _mapper.registerModule(DefaultScalaModule)
+ _mapper
+ }
+
+ def convert(record: InternalRow): UTF8String = {
+ val map = scala.collection.mutable.Map.empty[String, Any]
+ originTableSchema.structTypeSchema.zipWithIndex.foreach {
+ case (field, idx) =>
+ if (field.dataType.isInstanceOf[StringType]) {
+ map(field.name) =
Option(record.getUTF8String(idx)).map(_.toString).orNull
+ } else {
+ map(field.name) = record.get(idx, field.dataType)
+ }
+ }
+ UTF8String.fromString(mapper.writeValueAsString(map))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
new file mode 100644
index 00000000000..b9e5d09a81a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hudi.HoodieTableSchema
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestInternalRowToJsonStringConverter {
+ private val converter = new
InternalRowToJsonStringConverter(hoodieTableSchema)
+
+ @Test
+ def emptyRow(): Unit = {
+ val converter = new
InternalRowToJsonStringConverter(emptyHoodieTableSchema)
+ val row = InternalRow.empty
+ val converted = converter.convert(row)
+ assertEquals("{}", converted.toString)
+ }
+
+ @Test
+ def nonEmptyRow(): Unit = {
+ val row = InternalRow.fromSeq(Seq(1, UTF8String.fromString("foo")))
+ val converted = converter.convert(row)
+ assertEquals("""{"name":"foo","uuid":1}""", converted.toString)
+ }
+
+ @Test
+ def emptyString(): Unit = {
+ val row = InternalRow.fromSeq(Seq(1, UTF8String.EMPTY_UTF8))
+ val converted = converter.convert(row)
+ assertEquals("""{"name":"","uuid":1}""", converted.toString)
+ }
+
+ @Test
+ def nullString(): Unit = {
+ val row = InternalRow.fromSeq(Seq(1, null))
+ val converted = converter.convert(row)
+ assertTrue(converted.toString.equals("""{"uuid":1}""") ||
converted.toString.equals("""{"name":null,"uuid":1}"""))
+ }
+
+ private def hoodieTableSchema: HoodieTableSchema = {
+ val structTypeSchema = new StructType(Array[StructField](
+ StructField("uuid", DataTypes.IntegerType, nullable = false,
Metadata.empty),
+ StructField("name", DataTypes.StringType, nullable = true,
Metadata.empty)))
+ val avroSchemaStr: String =
+ """{"type": "record", "name": "test", "fields": [
+ |{"name": "uuid", "type": "int"},
+ |{"name": "name", "type": "string"}
+ |]}""".stripMargin
+ HoodieTableSchema(structTypeSchema, avroSchemaStr,
Option.empty[InternalSchema])
+ }
+
+ private def emptyHoodieTableSchema: HoodieTableSchema = {
+ val structTypeSchema = new StructType()
+ val avroSchemaStr = """{"type": "record", "name": "test", "fields": []}"""
+ HoodieTableSchema(structTypeSchema, avroSchemaStr,
Option.empty[InternalSchema])
+ }
+}