This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b77eff2522a [HUDI-7120] Performance improvements in deltastreamer 
executor code path (#10135)
b77eff2522a is described below

commit b77eff2522a975b0c456332d20eaea6eed882774
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Nov 23 10:47:40 2023 +0530

    [HUDI-7120] Performance improvements in deltastreamer executor code path 
(#10135)
---
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |   4 +-
 .../org/apache/hudi/AvroConversionUtils.scala      |   9 +
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  22 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  58 +++--
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   9 +-
 .../org/apache/hudi/TestAvroConversionUtils.scala  | 248 +++++++++++----------
 6 files changed, 186 insertions(+), 164 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 135e4866cc5..ab41a94c2a9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -62,9 +62,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
 
   public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+    String commitTime = baseFile.getCommitTime();
+    String fileId = baseFile.getFileId();
     return fetchRecordKeysWithPositions(baseFile).stream()
         .map(entry -> Pair.of(entry.getLeft(),
-            new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId(), entry.getRight())));
+            new HoodieRecordLocation(commitTime, fileId, entry.getRight())));
   }
 
   public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 818bf760047..d84679eaf92 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -18,6 +18,7 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema.Type
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.{JsonProperties, Schema}
 import org.apache.hudi.HoodieSparkUtils.sparkAdapter
@@ -242,4 +243,12 @@ object AvroConversionUtils {
     val nameParts = qualifiedName.split('.')
     (nameParts.last, nameParts.init.mkString("."))
   }
+
+  private def handleUnion(schema: Schema): Schema = {
+    if (schema.getType == Type.UNION) {
+      val index = if (schema.getTypes.get(0).getType == Schema.Type.NULL) 1 
else 0
+      return schema.getTypes.get(index)
+    }
+    schema
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index fcfc8a4f0b9..3c5486c47c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -249,6 +249,11 @@ public class AvroSchemaUtils {
     }
 
     List<Schema> innerTypes = schema.getTypes();
+    if (innerTypes.size() == 2 && isNullable(schema)) {
+      // this is a basic nullable field so handle it more efficiently
+      return resolveNullableSchema(schema);
+    }
+
     Schema nonNullType =
         innerTypes.stream()
             .filter(it -> it.getType() != Schema.Type.NULL && 
Objects.equals(it.getFullName(), fieldSchemaFullName))
@@ -286,18 +291,19 @@ public class AvroSchemaUtils {
     }
 
     List<Schema> innerTypes = schema.getTypes();
-    Schema nonNullType =
-        innerTypes.stream()
-            .filter(it -> it.getType() != Schema.Type.NULL)
-            .findFirst()
-            .orElse(null);
 
-    if (innerTypes.size() != 2 || nonNullType == null) {
+    if (innerTypes.size() != 2) {
       throw new AvroRuntimeException(
           String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
     }
-
-    return nonNullType;
+    Schema firstInnerType = innerTypes.get(0);
+    Schema secondInnerType = innerTypes.get(1);
+    if ((firstInnerType.getType() != Schema.Type.NULL && 
secondInnerType.getType() != Schema.Type.NULL)
+        || (firstInnerType.getType() == Schema.Type.NULL && 
secondInnerType.getType() == Schema.Type.NULL)) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
+    }
+    return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : 
firstInnerType;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 54c37b333e7..3800d9c1053 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -265,7 +265,8 @@ public class HoodieAvroUtils {
    * @param withOperationField Whether to include the '_hoodie_operation' field
    */
   public static Schema addMetadataFields(Schema schema, boolean 
withOperationField) {
-    List<Schema.Field> parentFields = new ArrayList<>();
+    int newFieldsSize = HoodieRecord.HOODIE_META_COLUMNS.size() + 
(withOperationField ? 1 : 0);
+    List<Schema.Field> parentFields = new 
ArrayList<>(schema.getFields().size() + newFieldsSize);
 
     Schema.Field commitTimeField =
         new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
@@ -439,12 +440,6 @@ public class HoodieAvroUtils {
         copyOldValueOrSetDefault(oldRecord, newRecord, f);
       }
     }
-
-    if (!ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
-      throw new SchemaCompatibilityException(
-          "Unable to validate the rewritten record " + oldRecord + " against 
schema " + newSchema);
-    }
-
     return newRecord;
   }
 
@@ -455,10 +450,6 @@ public class HoodieAvroUtils {
     }
     // do not preserve FILENAME_METADATA_FIELD
     newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
-    if (!GenericData.get().validate(newSchema, newRecord)) {
-      throw new SchemaCompatibilityException(
-          "Unable to validate the rewritten record " + genericRecord + " 
against schema " + newSchema);
-    }
     return newRecord;
   }
 
@@ -494,7 +485,7 @@ public class HoodieAvroUtils {
   private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field field) {
     Schema oldSchema = oldRecord.getSchema();
     Field oldSchemaField = oldSchema.getField(field.name());
-    Object fieldValue = oldSchemaField == null ? null : 
oldRecord.get(field.name());
+    Object fieldValue = oldSchemaField == null ? null : 
oldRecord.get(oldSchemaField.pos());
 
     if (fieldValue != null) {
       // In case field's value is a nested record, we have to rewrite it as 
well
@@ -508,11 +499,14 @@ public class HoodieAvroUtils {
       } else {
         newFieldValue = fieldValue;
       }
-      newRecord.put(field.name(), newFieldValue);
+      newRecord.put(field.pos(), newFieldValue);
     } else if (field.defaultVal() instanceof JsonProperties.Null) {
-      newRecord.put(field.name(), null);
+      newRecord.put(field.pos(), null);
     } else {
-      newRecord.put(field.name(), field.defaultVal());
+      if (!isNullable(field.schema()) && field.defaultVal() == null) {
+        throw new SchemaCompatibilityException("Field " + field.name() + " has 
no default value and is null in old record");
+      }
+      newRecord.put(field.pos(), field.defaultVal());
     }
   }
 
@@ -562,7 +556,8 @@ public class HoodieAvroUtils {
    * it is consistent with avro after 1.10
    */
   public static Object getFieldVal(GenericRecord record, String key, boolean 
returnNullIfNotFound) {
-    if (record.getSchema().getField(key) == null) {
+    Schema.Field field = record.getSchema().getField(key);
+    if (field == null) {
       if (returnNullIfNotFound) {
         return null;
       } else {
@@ -572,7 +567,7 @@ public class HoodieAvroUtils {
         throw new AvroRuntimeException("Not a valid schema field: " + key);
       }
     } else {
-      return record.get(key);
+      return record.get(field.pos());
     }
   }
 
@@ -874,7 +869,8 @@ public class HoodieAvroUtils {
     }
     // try to get real schema for union type
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
-    Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, 
oldSchema, newSchema, renameCols, fieldNames, validate);
+    Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, 
oldSchema, newSchema, renameCols, fieldNames);
+    // validation is recursive so it only needs to be called on the original 
input
     if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, 
newRecord)) {
       throw new SchemaCompatibilityException(
           "Unable to validate the rewritten record " + oldRecord + " against 
schema " + newSchema);
@@ -882,7 +878,7 @@ public class HoodieAvroUtils {
     return newRecord;
   }
 
-  private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, 
Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames, boolean validate) {
+  private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, 
Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
     switch (newSchema.getType()) {
       case RECORD:
         ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, 
"cannot rewrite record with different type");
@@ -893,17 +889,17 @@ public class HoodieAvroUtils {
           Schema.Field field = fields.get(i);
           String fieldName = field.name();
           fieldNames.push(fieldName);
-          if (oldSchema.getField(field.name()) != null && 
!renameCols.containsKey(field.name())) {
-            Schema.Field oldField = oldSchema.getField(field.name());
-            newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
+          Schema.Field oldField = oldSchema.getField(field.name());
+          if (oldField != null && !renameCols.containsKey(field.name())) {
+            newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
           } else {
             String fieldFullName = createFullName(fieldNames);
-            String fieldNameFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "");
+            String fieldNameFromOldSchema = renameCols.get(fieldFullName);
             // deal with rename
-            if (oldSchema.getField(fieldNameFromOldSchema) != null) {
+            Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? 
null : oldSchema.getField(fieldNameFromOldSchema);
+            if (oldFieldRenamed != null) {
               // find rename
-              Schema.Field oldField = 
oldSchema.getField(fieldNameFromOldSchema);
-              newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
+              newRecord.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), 
oldFieldRenamed.schema(), fields.get(i).schema(), renameCols, fieldNames, 
false));
             } else {
               // deal with default value
               if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
@@ -927,25 +923,25 @@ public class HoodieAvroUtils {
       case ARRAY:
         ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot 
rewrite record with different type");
         Collection array = (Collection) oldRecord;
-        List<Object> newArray = new ArrayList(array.size());
+        List<Object> newArray = new ArrayList<>(array.size());
         fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, 
validate));
+          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, 
false));
         }
         fieldNames.pop();
         return newArray;
       case MAP:
         ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot 
rewrite record with different type");
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
-        Map<Object, Object> newMap = new HashMap<>(map.size(), 1);
+        Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
         fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames, validate));
+          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames, false));
         }
         fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, 
validate);
+        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, false);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 954fe75a0ac..770c811a2e8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -87,6 +87,8 @@ public class FSUtils {
   private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
   private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
 
+  private static final String LOG_FILE_EXTENSION = ".log";
+
   private static final PathFilter ALLOW_ALL_FILTER = file -> true;
 
   public static Configuration prepareHadoopConf(Configuration conf) {
@@ -474,8 +476,11 @@ public class FSUtils {
   }
 
   public static boolean isLogFile(String fileName) {
-    Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
-    return fileName.contains(".log") && matcher.find();
+    if (fileName.contains(LOG_FILE_EXTENSION)) {
+      Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+      return matcher.find();
+    }
+    return false;
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
index 70d74f0a0ce..5cd6ac3954e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -30,146 +30,150 @@ import org.scalatest.{FunSuite, Matchers}
 class TestAvroConversionUtils extends FunSuite with Matchers {
 
 
-  test("test convertStructTypeToAvroSchema") {
-    val mapType = DataTypes.createMapType(StringType, new 
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
-    val arrayType =  ArrayType(new StructType().add("arrayKey", "string", 
false).add("arrayVal", "integer", true))
-    val innerStruct = new 
StructType().add("innerKey","string",false).add("value", "long", true)
-
-    val struct = new StructType().add("key", "string", false).add("version", 
"string", true)
-      .add("data1",innerStruct,false).add("data2",innerStruct,true)
-      .add("nullableMap", mapType, true).add("map",mapType,false)
-      .add("nullableArray", arrayType, true).add("array",arrayType,false)
-
-    val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, 
"SchemaName", "SchemaNS")
-
-    val expectedSchemaStr = s"""
-       {
-         "type" : "record",
-         "name" : "SchemaName",
-         "namespace" : "SchemaNS",
-         "fields" : [ {
-           "name" : "key",
-           "type" : "string"
-         }, {
-           "name" : "version",
-           "type" : [ "null", "string" ],
-           "default" : null
-         }, {
+  val complexSchemaStr =
+    s"""
+    {
+       "type" : "record",
+       "name" : "SchemaName",
+       "namespace" : "SchemaNS",
+       "fields" : [ {
+         "name" : "key",
+         "type" : "string"
+       }, {
+         "name" : "version",
+         "type" : [ "null", "string" ],
+         "default" : null
+       }, {
+         "name" : "data1",
+         "type" : {
+           "type" : "record",
            "name" : "data1",
-           "type" : {
+           "namespace" : "SchemaNS.SchemaName",
+           "fields" : [ {
+             "name" : "innerKey",
+             "type" : "string"
+           }, {
+             "name" : "value",
+             "type" : [ "null", "long" ],
+             "default" : null
+           } ]
+         }
+       }, {
+         "name" : "data2",
+         "type" : [ "null", {
+           "type" : "record",
+           "name" : "data2",
+           "namespace" : "SchemaNS.SchemaName",
+           "fields" : [ {
+             "name" : "innerKey",
+             "type" : "string"
+           }, {
+             "name" : "value",
+             "type" : [ "null", "long" ],
+             "default" : null
+           } ]
+         } ],
+         "default" : null
+       }, {
+         "name" : "nullableMap",
+         "type" : [ "null", {
+           "type" : "map",
+           "values" : [
+           "null",
+           {
              "type" : "record",
-             "name" : "data1",
+             "name" : "nullableMap",
              "namespace" : "SchemaNS.SchemaName",
              "fields" : [ {
-               "name" : "innerKey",
+               "name" : "mapKey",
                "type" : "string"
              }, {
-               "name" : "value",
-               "type" : [ "null", "long" ],
+               "name" : "mapVal",
+               "type" : [ "null", "int" ],
                "default" : null
              } ]
-           }
-         }, {
-           "name" : "data2",
-           "type" : [ "null", {
+           } ]
+         } ],
+         "default" : null
+       }, {
+         "name" : "map",
+         "type" : {
+           "type" : "map",
+           "values" : [
+           "null",
+           {
              "type" : "record",
-             "name" : "data2",
+             "name" : "map",
              "namespace" : "SchemaNS.SchemaName",
              "fields" : [ {
-               "name" : "innerKey",
+               "name" : "mapKey",
                "type" : "string"
              }, {
-               "name" : "value",
-               "type" : [ "null", "long" ],
+               "name" : "mapVal",
+               "type" : [ "null", "int" ],
                "default" : null
              } ]
-           } ],
-           "default" : null
-         }, {
-           "name" : "nullableMap",
-           "type" : [ "null", {
-             "type" : "map",
-             "values" : [
-             "null",
-             {
-               "type" : "record",
-               "name" : "nullableMap",
-               "namespace" : "SchemaNS.SchemaName",
-               "fields" : [ {
-                 "name" : "mapKey",
-                 "type" : "string"
-               }, {
-                 "name" : "mapVal",
-                 "type" : [ "null", "int" ],
-                 "default" : null
-               } ]
-             } ]
-           } ],
-           "default" : null
-         }, {
-           "name" : "map",
-           "type" : {
-             "type" : "map",
-             "values" : [
-             "null",
-             {
-               "type" : "record",
-               "name" : "map",
-               "namespace" : "SchemaNS.SchemaName",
-               "fields" : [ {
-                 "name" : "mapKey",
-                 "type" : "string"
-               }, {
-                 "name" : "mapVal",
-                 "type" : [ "null", "int" ],
-                 "default" : null
-               } ]
-             } ]
-           }
-         }, {
-           "name" : "nullableArray",
-           "type" : [ "null", {
-             "type" : "array",
-             "items" : [
-             "null",
-             {
-               "type" : "record",
-               "name" : "nullableArray",
-               "namespace" : "SchemaNS.SchemaName",
-               "fields" : [ {
-                 "name" : "arrayKey",
-                 "type" : "string"
-               }, {
-                 "name" : "arrayVal",
-                 "type" : [ "null", "int" ],
-                 "default" : null
-               } ]
+           } ]
+         }
+       }, {
+         "name" : "nullableArray",
+         "type" : [ "null", {
+           "type" : "array",
+           "items" : [
+           "null",
+           {
+             "type" : "record",
+             "name" : "nullableArray",
+             "namespace" : "SchemaNS.SchemaName",
+             "fields" : [ {
+               "name" : "arrayKey",
+               "type" : "string"
+             }, {
+               "name" : "arrayVal",
+               "type" : [ "null", "int" ],
+               "default" : null
              } ]
-           } ],
-           "default" : null
-         }, {
-           "name" : "array",
-           "type" : {
-             "type" : "array",
-             "items" : [
-             "null",
-             {
-               "type" : "record",
-               "name" : "array",
-               "namespace" : "SchemaNS.SchemaName",
-               "fields" : [ {
-                 "name" : "arrayKey",
-                 "type" : "string"
-               }, {
-                 "name" : "arrayVal",
-                 "type" : [ "null", "int" ],
-                 "default" : null
-               } ]
+           } ]
+         } ],
+         "default" : null
+       }, {
+         "name" : "array",
+         "type" : {
+           "type" : "array",
+           "items" : [
+           "null",
+           {
+             "type" : "record",
+             "name" : "array",
+             "namespace" : "SchemaNS.SchemaName",
+             "fields" : [ {
+               "name" : "arrayKey",
+               "type" : "string"
+             }, {
+               "name" : "arrayVal",
+               "type" : [ "null", "int" ],
+               "default" : null
              } ]
-           }
-         } ]
-       }
+           } ]
+         }
+       } ]
+     }
     """
+
+
+  test("test convertStructTypeToAvroSchema_orig") {
+    val mapType = DataTypes.createMapType(StringType, new 
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
+    val arrayType = ArrayType(new StructType().add("arrayKey", "string", 
false).add("arrayVal", "integer", true))
+    val innerStruct = new StructType().add("innerKey", "string", 
false).add("value", "long", true)
+
+    val struct = new StructType().add("key", "string", false).add("version", 
"string", true)
+      .add("data1", innerStruct, false).add("data2", innerStruct, true)
+      .add("nullableMap", mapType, true).add("map", mapType, false)
+      .add("nullableArray", arrayType, true).add("array", arrayType, false)
+
+    val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, 
"SchemaName", "SchemaNS")
+
+    val expectedSchemaStr = complexSchemaStr
     val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)
 
     assert(avroSchema.equals(expectedAvroSchema))

Reply via email to