This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 800639e361b [HUDI-6450] Fix null strings handling in 
convertRowToJsonString (#9064)
800639e361b is described below

commit 800639e361b7d8b00031d56e07e6cb45e115eb28
Author: Tomasz Zarna <[email protected]>
AuthorDate: Wed Jul 5 12:18:37 2023 +0200

    [HUDI-6450] Fix null strings handling in convertRowToJsonString (#9064)
    
    * Fix by handling null strings in convertRowToJsonString
    * Instantiate InternalRowToJsonStringConverter only once
    * Depending on Jackson version/configuration null values are serialized or 
not
    
    ---------
    
    Co-authored-by: Tomasz Zarna <[email protected]>
---
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 26 ++------
 .../cdc/InternalRowToJsonStringConverter.scala     | 51 ++++++++++++++
 .../cdc/TestInternalRowToJsonStringConverter.scala | 78 ++++++++++++++++++++++
 3 files changed, 133 insertions(+), 22 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index b42e6f88007..839b02828d0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.cdc
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
 import org.apache.hadoop.fs.Path
@@ -47,7 +44,7 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Projection
 import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{Partition, SerializableWritable, TaskContext}
 
@@ -157,14 +154,6 @@ class HoodieCDCRDD(
       )
     }
 
-    private lazy val mapper: ObjectMapper = {
-      val _mapper = new ObjectMapper
-      _mapper.setSerializationInclusion(Include.NON_ABSENT)
-      _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false)
-      _mapper.registerModule(DefaultScalaModule)
-      _mapper
-    }
-
     protected override val avroSchema: Schema = new 
Schema.Parser().parse(originTableSchema.avroSchemaStr)
 
     protected override val structTypeSchema: StructType = 
originTableSchema.structTypeSchema
@@ -245,6 +234,8 @@ class HoodieCDCRDD(
      */
     private var afterImageRecords: mutable.Map[String, InternalRow] = 
mutable.Map.empty
 
+    private var internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema)
+
     private def needLoadNextFile: Boolean = {
       !recordIter.hasNext &&
         !logRecordIter.hasNext &&
@@ -557,16 +548,7 @@ class HoodieCDCRDD(
      * Convert InternalRow to json string.
      */
     private def convertRowToJsonString(record: InternalRow): UTF8String = {
-      val map = scala.collection.mutable.Map.empty[String, Any]
-      originTableSchema.structTypeSchema.zipWithIndex.foreach {
-        case (field, idx) =>
-          if (field.dataType.isInstanceOf[StringType]) {
-            map(field.name) = record.getString(idx)
-          } else {
-            map(field.name) = record.get(idx, field.dataType)
-          }
-      }
-      convertToUTF8String(mapper.writeValueAsString(map))
+      internalRowToJsonStringConverter.convert(record)
     }
 
     /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
new file mode 100644
index 00000000000..c0f52a7ca12
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.hudi.HoodieTableSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+class InternalRowToJsonStringConverter(originTableSchema: HoodieTableSchema) {
+
+  private lazy val mapper: ObjectMapper = {
+    val _mapper = new ObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def convert(record: InternalRow): UTF8String = {
+    val map = scala.collection.mutable.Map.empty[String, Any]
+    originTableSchema.structTypeSchema.zipWithIndex.foreach {
+      case (field, idx) =>
+        if (field.dataType.isInstanceOf[StringType]) {
+          map(field.name) = 
Option(record.getUTF8String(idx)).map(_.toString).orNull
+        } else {
+          map(field.name) = record.get(idx, field.dataType)
+        }
+    }
+    UTF8String.fromString(mapper.writeValueAsString(map))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
new file mode 100644
index 00000000000..b9e5d09a81a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hudi.HoodieTableSchema
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestInternalRowToJsonStringConverter {
+  private val converter = new 
InternalRowToJsonStringConverter(hoodieTableSchema)
+
+  @Test
+  def emptyRow(): Unit = {
+    val converter = new 
InternalRowToJsonStringConverter(emptyHoodieTableSchema)
+    val row = InternalRow.empty
+    val converted = converter.convert(row)
+    assertEquals("{}", converted.toString)
+  }
+
+  @Test
+  def nonEmptyRow(): Unit = {
+    val row = InternalRow.fromSeq(Seq(1, UTF8String.fromString("foo")))
+    val converted = converter.convert(row)
+    assertEquals("""{"name":"foo","uuid":1}""", converted.toString)
+  }
+
+  @Test
+  def emptyString(): Unit = {
+    val row = InternalRow.fromSeq(Seq(1, UTF8String.EMPTY_UTF8))
+    val converted = converter.convert(row)
+    assertEquals("""{"name":"","uuid":1}""", converted.toString)
+  }
+
+  @Test
+  def nullString(): Unit = {
+    val row = InternalRow.fromSeq(Seq(1, null))
+    val converted = converter.convert(row)
+    assertTrue(converted.toString.equals("""{"uuid":1}""") || 
converted.toString.equals("""{"name":null,"uuid":1}"""))
+  }
+
+  private def hoodieTableSchema: HoodieTableSchema = {
+    val structTypeSchema = new StructType(Array[StructField](
+      StructField("uuid", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+      StructField("name", DataTypes.StringType, nullable = true, 
Metadata.empty)))
+    val avroSchemaStr: String =
+      """{"type": "record", "name": "test", "fields": [
+        |{"name": "uuid", "type": "int"},
+        |{"name": "name", "type": "string"}
+        |]}""".stripMargin
+    HoodieTableSchema(structTypeSchema, avroSchemaStr, 
Option.empty[InternalSchema])
+  }
+
+  private def emptyHoodieTableSchema: HoodieTableSchema = {
+    val structTypeSchema = new StructType()
+    val avroSchemaStr = """{"type": "record", "name": "test", "fields": []}"""
+    HoodieTableSchema(structTypeSchema, avroSchemaStr, 
Option.empty[InternalSchema])
+  }
+}

Reply via email to