This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b77eff2522a [HUDI-7120] Performance improvements in deltastreamer
executor code path (#10135)
b77eff2522a is described below
commit b77eff2522a975b0c456332d20eaea6eed882774
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Nov 23 10:47:40 2023 +0530
[HUDI-7120] Performance improvements in deltastreamer executor code path
(#10135)
---
.../hudi/io/HoodieKeyLocationFetchHandle.java | 4 +-
.../org/apache/hudi/AvroConversionUtils.scala | 9 +
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 22 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 58 +++--
.../java/org/apache/hudi/common/fs/FSUtils.java | 9 +-
.../org/apache/hudi/TestAvroConversionUtils.scala | 248 +++++++++++----------
6 files changed, 186 insertions(+), 164 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 135e4866cc5..ab41a94c2a9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -62,9 +62,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+ String commitTime = baseFile.getCommitTime();
+ String fileId = baseFile.getFileId();
return fetchRecordKeysWithPositions(baseFile).stream()
.map(entry -> Pair.of(entry.getLeft(),
- new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId(), entry.getRight())));
+ new HoodieRecordLocation(commitTime, fileId, entry.getRight())));
}
public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 818bf760047..d84679eaf92 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -18,6 +18,7 @@
package org.apache.hudi
+import org.apache.avro.Schema.Type
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
@@ -242,4 +243,12 @@ object AvroConversionUtils {
val nameParts = qualifiedName.split('.')
(nameParts.last, nameParts.init.mkString("."))
}
+
+ private def handleUnion(schema: Schema): Schema = {
+ if (schema.getType == Type.UNION) {
+ val index = if (schema.getTypes.get(0).getType == Schema.Type.NULL) 1
else 0
+ return schema.getTypes.get(index)
+ }
+ schema
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index fcfc8a4f0b9..3c5486c47c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -249,6 +249,11 @@ public class AvroSchemaUtils {
}
List<Schema> innerTypes = schema.getTypes();
+ if (innerTypes.size() == 2 && isNullable(schema)) {
+ // this is a basic nullable field so handle it more efficiently
+ return resolveNullableSchema(schema);
+ }
+
Schema nonNullType =
innerTypes.stream()
.filter(it -> it.getType() != Schema.Type.NULL &&
Objects.equals(it.getFullName(), fieldSchemaFullName))
@@ -286,18 +291,19 @@ public class AvroSchemaUtils {
}
List<Schema> innerTypes = schema.getTypes();
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL)
- .findFirst()
- .orElse(null);
- if (innerTypes.size() != 2 || nonNullType == null) {
+ if (innerTypes.size() != 2) {
throw new AvroRuntimeException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
}
-
- return nonNullType;
+ Schema firstInnerType = innerTypes.get(0);
+ Schema secondInnerType = innerTypes.get(1);
+ if ((firstInnerType.getType() != Schema.Type.NULL &&
secondInnerType.getType() != Schema.Type.NULL)
+ || (firstInnerType.getType() == Schema.Type.NULL &&
secondInnerType.getType() == Schema.Type.NULL)) {
+ throw new AvroRuntimeException(
+ String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
+ }
+ return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType :
firstInnerType;
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 54c37b333e7..3800d9c1053 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -265,7 +265,8 @@ public class HoodieAvroUtils {
* @param withOperationField Whether to include the '_hoodie_operation' field
*/
public static Schema addMetadataFields(Schema schema, boolean
withOperationField) {
- List<Schema.Field> parentFields = new ArrayList<>();
+ int newFieldsSize = HoodieRecord.HOODIE_META_COLUMNS.size() +
(withOperationField ? 1 : 0);
+ List<Schema.Field> parentFields = new
ArrayList<>(schema.getFields().size() + newFieldsSize);
Schema.Field commitTimeField =
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
@@ -439,12 +440,6 @@ public class HoodieAvroUtils {
copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
}
-
- if (!ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
- throw new SchemaCompatibilityException(
- "Unable to validate the rewritten record " + oldRecord + " against
schema " + newSchema);
- }
-
return newRecord;
}
@@ -455,10 +450,6 @@ public class HoodieAvroUtils {
}
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
- if (!GenericData.get().validate(newSchema, newRecord)) {
- throw new SchemaCompatibilityException(
- "Unable to validate the rewritten record " + genericRecord + "
against schema " + newSchema);
- }
return newRecord;
}
@@ -494,7 +485,7 @@ public class HoodieAvroUtils {
private static void copyOldValueOrSetDefault(GenericRecord oldRecord,
GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Field oldSchemaField = oldSchema.getField(field.name());
- Object fieldValue = oldSchemaField == null ? null :
oldRecord.get(field.name());
+ Object fieldValue = oldSchemaField == null ? null :
oldRecord.get(oldSchemaField.pos());
if (fieldValue != null) {
// In case field's value is a nested record, we have to rewrite it as
well
@@ -508,11 +499,14 @@ public class HoodieAvroUtils {
} else {
newFieldValue = fieldValue;
}
- newRecord.put(field.name(), newFieldValue);
+ newRecord.put(field.pos(), newFieldValue);
} else if (field.defaultVal() instanceof JsonProperties.Null) {
- newRecord.put(field.name(), null);
+ newRecord.put(field.pos(), null);
} else {
- newRecord.put(field.name(), field.defaultVal());
+ if (!isNullable(field.schema()) && field.defaultVal() == null) {
+ throw new SchemaCompatibilityException("Field " + field.name() + " has
no default value and is null in old record");
+ }
+ newRecord.put(field.pos(), field.defaultVal());
}
}
@@ -562,7 +556,8 @@ public class HoodieAvroUtils {
* it is consistent with avro after 1.10
*/
public static Object getFieldVal(GenericRecord record, String key, boolean
returnNullIfNotFound) {
- if (record.getSchema().getField(key) == null) {
+ Schema.Field field = record.getSchema().getField(key);
+ if (field == null) {
if (returnNullIfNotFound) {
return null;
} else {
@@ -572,7 +567,7 @@ public class HoodieAvroUtils {
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
} else {
- return record.get(key);
+ return record.get(field.pos());
}
}
@@ -874,7 +869,8 @@ public class HoodieAvroUtils {
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
- Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord,
oldSchema, newSchema, renameCols, fieldNames, validate);
+ Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord,
oldSchema, newSchema, renameCols, fieldNames);
+ // validation is recursive so it only needs to be called on the original
input
if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema,
newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + oldRecord + " against
schema " + newSchema);
@@ -882,7 +878,7 @@ public class HoodieAvroUtils {
return newRecord;
}
- private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord,
Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames, boolean validate) {
+ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord,
Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord,
"cannot rewrite record with different type");
@@ -893,17 +889,17 @@ public class HoodieAvroUtils {
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
- if (oldSchema.getField(field.name()) != null &&
!renameCols.containsKey(field.name())) {
- Schema.Field oldField = oldSchema.getField(field.name());
- newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
+ Schema.Field oldField = oldSchema.getField(field.name());
+ if (oldField != null && !renameCols.containsKey(field.name())) {
+ newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
} else {
String fieldFullName = createFullName(fieldNames);
- String fieldNameFromOldSchema =
renameCols.getOrDefault(fieldFullName, "");
+ String fieldNameFromOldSchema = renameCols.get(fieldFullName);
// deal with rename
- if (oldSchema.getField(fieldNameFromOldSchema) != null) {
+ Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ?
null : oldSchema.getField(fieldNameFromOldSchema);
+ if (oldFieldRenamed != null) {
// find rename
- Schema.Field oldField =
oldSchema.getField(fieldNameFromOldSchema);
- newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
+ newRecord.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()),
oldFieldRenamed.schema(), fields.get(i).schema(), renameCols, fieldNames,
false));
} else {
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
@@ -927,25 +923,25 @@ public class HoodieAvroUtils {
case ARRAY:
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot
rewrite record with different type");
Collection array = (Collection) oldRecord;
- List<Object> newArray = new ArrayList(array.size());
+ List<Object> newArray = new ArrayList<>(array.size());
fieldNames.push("element");
for (Object element : array) {
- newArray.add(rewriteRecordWithNewSchema(element,
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames,
validate));
+ newArray.add(rewriteRecordWithNewSchema(element,
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames,
false));
}
fieldNames.pop();
return newArray;
case MAP:
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot
rewrite record with different type");
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
- Map<Object, Object> newMap = new HashMap<>(map.size(), 1);
+ Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
- newMap.put(entry.getKey(),
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(),
newSchema.getValueType(), renameCols, fieldNames, validate));
+ newMap.put(entry.getKey(),
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(),
newSchema.getValueType(), renameCols, fieldNames, false));
}
fieldNames.pop();
return newMap;
case UNION:
- return rewriteRecordWithNewSchema(oldRecord,
getActualSchemaFromUnion(oldSchema, oldRecord),
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames,
validate);
+ return rewriteRecordWithNewSchema(oldRecord,
getActualSchemaFromUnion(oldSchema, oldRecord),
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, false);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 954fe75a0ac..770c811a2e8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -87,6 +87,8 @@ public class FSUtils {
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
+ private static final String LOG_FILE_EXTENSION = ".log";
+
private static final PathFilter ALLOW_ALL_FILTER = file -> true;
public static Configuration prepareHadoopConf(Configuration conf) {
@@ -474,8 +476,11 @@ public class FSUtils {
}
public static boolean isLogFile(String fileName) {
- Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
- return fileName.contains(".log") && matcher.find();
+ if (fileName.contains(LOG_FILE_EXTENSION)) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+ return matcher.find();
+ }
+ return false;
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
index 70d74f0a0ce..5cd6ac3954e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -30,146 +30,150 @@ import org.scalatest.{FunSuite, Matchers}
class TestAvroConversionUtils extends FunSuite with Matchers {
- test("test convertStructTypeToAvroSchema") {
- val mapType = DataTypes.createMapType(StringType, new
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
- val arrayType = ArrayType(new StructType().add("arrayKey", "string",
false).add("arrayVal", "integer", true))
- val innerStruct = new
StructType().add("innerKey","string",false).add("value", "long", true)
-
- val struct = new StructType().add("key", "string", false).add("version",
"string", true)
- .add("data1",innerStruct,false).add("data2",innerStruct,true)
- .add("nullableMap", mapType, true).add("map",mapType,false)
- .add("nullableArray", arrayType, true).add("array",arrayType,false)
-
- val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct,
"SchemaName", "SchemaNS")
-
- val expectedSchemaStr = s"""
- {
- "type" : "record",
- "name" : "SchemaName",
- "namespace" : "SchemaNS",
- "fields" : [ {
- "name" : "key",
- "type" : "string"
- }, {
- "name" : "version",
- "type" : [ "null", "string" ],
- "default" : null
- }, {
+ val complexSchemaStr =
+ s"""
+ {
+ "type" : "record",
+ "name" : "SchemaName",
+ "namespace" : "SchemaNS",
+ "fields" : [ {
+ "name" : "key",
+ "type" : "string"
+ }, {
+ "name" : "version",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "data1",
+ "type" : {
+ "type" : "record",
"name" : "data1",
- "type" : {
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "innerKey",
+ "type" : "string"
+ }, {
+ "name" : "value",
+ "type" : [ "null", "long" ],
+ "default" : null
+ } ]
+ }
+ }, {
+ "name" : "data2",
+ "type" : [ "null", {
+ "type" : "record",
+ "name" : "data2",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "innerKey",
+ "type" : "string"
+ }, {
+ "name" : "value",
+ "type" : [ "null", "long" ],
+ "default" : null
+ } ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "nullableMap",
+ "type" : [ "null", {
+ "type" : "map",
+ "values" : [
+ "null",
+ {
"type" : "record",
- "name" : "data1",
+ "name" : "nullableMap",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
- "name" : "innerKey",
+ "name" : "mapKey",
"type" : "string"
}, {
- "name" : "value",
- "type" : [ "null", "long" ],
+ "name" : "mapVal",
+ "type" : [ "null", "int" ],
"default" : null
} ]
- }
- }, {
- "name" : "data2",
- "type" : [ "null", {
+ } ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "map",
+ "type" : {
+ "type" : "map",
+ "values" : [
+ "null",
+ {
"type" : "record",
- "name" : "data2",
+ "name" : "map",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
- "name" : "innerKey",
+ "name" : "mapKey",
"type" : "string"
}, {
- "name" : "value",
- "type" : [ "null", "long" ],
+ "name" : "mapVal",
+ "type" : [ "null", "int" ],
"default" : null
} ]
- } ],
- "default" : null
- }, {
- "name" : "nullableMap",
- "type" : [ "null", {
- "type" : "map",
- "values" : [
- "null",
- {
- "type" : "record",
- "name" : "nullableMap",
- "namespace" : "SchemaNS.SchemaName",
- "fields" : [ {
- "name" : "mapKey",
- "type" : "string"
- }, {
- "name" : "mapVal",
- "type" : [ "null", "int" ],
- "default" : null
- } ]
- } ]
- } ],
- "default" : null
- }, {
- "name" : "map",
- "type" : {
- "type" : "map",
- "values" : [
- "null",
- {
- "type" : "record",
- "name" : "map",
- "namespace" : "SchemaNS.SchemaName",
- "fields" : [ {
- "name" : "mapKey",
- "type" : "string"
- }, {
- "name" : "mapVal",
- "type" : [ "null", "int" ],
- "default" : null
- } ]
- } ]
- }
- }, {
- "name" : "nullableArray",
- "type" : [ "null", {
- "type" : "array",
- "items" : [
- "null",
- {
- "type" : "record",
- "name" : "nullableArray",
- "namespace" : "SchemaNS.SchemaName",
- "fields" : [ {
- "name" : "arrayKey",
- "type" : "string"
- }, {
- "name" : "arrayVal",
- "type" : [ "null", "int" ],
- "default" : null
- } ]
+ } ]
+ }
+ }, {
+ "name" : "nullableArray",
+ "type" : [ "null", {
+ "type" : "array",
+ "items" : [
+ "null",
+ {
+ "type" : "record",
+ "name" : "nullableArray",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "arrayKey",
+ "type" : "string"
+ }, {
+ "name" : "arrayVal",
+ "type" : [ "null", "int" ],
+ "default" : null
} ]
- } ],
- "default" : null
- }, {
- "name" : "array",
- "type" : {
- "type" : "array",
- "items" : [
- "null",
- {
- "type" : "record",
- "name" : "array",
- "namespace" : "SchemaNS.SchemaName",
- "fields" : [ {
- "name" : "arrayKey",
- "type" : "string"
- }, {
- "name" : "arrayVal",
- "type" : [ "null", "int" ],
- "default" : null
- } ]
+ } ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "array",
+ "type" : {
+ "type" : "array",
+ "items" : [
+ "null",
+ {
+ "type" : "record",
+ "name" : "array",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "arrayKey",
+ "type" : "string"
+ }, {
+ "name" : "arrayVal",
+ "type" : [ "null", "int" ],
+ "default" : null
} ]
- }
- } ]
- }
+ } ]
+ }
+ } ]
+ }
"""
+
+
+ test("test convertStructTypeToAvroSchema_orig") {
+ val mapType = DataTypes.createMapType(StringType, new
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
+ val arrayType = ArrayType(new StructType().add("arrayKey", "string",
false).add("arrayVal", "integer", true))
+ val innerStruct = new StructType().add("innerKey", "string",
false).add("value", "long", true)
+
+ val struct = new StructType().add("key", "string", false).add("version",
"string", true)
+ .add("data1", innerStruct, false).add("data2", innerStruct, true)
+ .add("nullableMap", mapType, true).add("map", mapType, false)
+ .add("nullableArray", arrayType, true).add("array", arrayType, false)
+
+ val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct,
"SchemaName", "SchemaNS")
+
+ val expectedSchemaStr = complexSchemaStr
val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)
assert(avroSchema.equals(expectedAvroSchema))