This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ca768efa25 NIFI-15448 Add option for using Predefined Schemas in 
GenerateRecord (#10752)
ca768efa25 is described below

commit ca768efa2540aed6bbf649a107d45424264e9cac
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Feb 1 03:58:24 2026 +0100

    NIFI-15448 Add option for using Predefined Schemas in GenerateRecord 
(#10752)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/standard/GenerateRecord.java   | 150 +++-
 .../standard/faker/PredefinedRecordSchema.java     | 757 +++++++++++++++++++++
 .../processors/standard/TestGenerateRecord.java    | 128 ++++
 3 files changed, 1005 insertions(+), 30 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
index d355ee58ae..818d7679c8 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
@@ -31,6 +31,8 @@ import org.apache.nifi.avro.AvroSchemaValidator;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,6 +44,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -65,6 +68,7 @@ import java.time.LocalDate;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -84,9 +88,12 @@ import static 
org.apache.nifi.processors.standard.faker.FakerUtils.DEFAULT_DATE_
         @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
         @WritesAttribute(attribute = "record.count", description = "The number 
of records in the FlowFile"),
 })
-@CapabilityDescription("This processor creates FlowFiles with records having 
random value for the specified fields. GenerateRecord is useful " +
-        "for testing, configuration, and simulation. It uses either 
user-defined properties to define a record schema or a provided schema and 
generates the specified number of records using " +
-        "random data for the fields in the schema.")
+@CapabilityDescription("""
+        This processor creates FlowFiles with records having random value for 
the specified fields. GenerateRecord is useful
+        for testing, configuration, and simulation. It uses one of three 
methods to define a record schema: (1) a provided Avro Schema Text,
+        (2) a Predefined Schema template such as Person, Order, Event, Sensor, 
Product, Stock Trade, or Complete Example covering all data types,
+        or (3) user-defined dynamic properties. The processor generates the 
specified number of records using random data for the fields in the schema.
+        """)
 @DynamicProperties({
         @DynamicProperty(
                 name = "Field name in generated record",
@@ -122,16 +129,21 @@ public class GenerateRecord extends AbstractProcessor {
 
     static final PropertyDescriptor NULLABLE_FIELDS = new 
PropertyDescriptor.Builder()
             .name("Nullable Fields")
-            .description("Whether the generated fields will be nullable. Note 
that this property is ignored if Schema Text is set. Also it only affects the 
schema of the generated data, " +
-                    "not whether any values will be null. If this property is 
true, see 'Null Value Percentage' to set the probability that any generated 
field will be null.")
+            .description("""
+                    Whether the generated fields will be nullable. Note that 
this property is ignored if Schema Text is set.
+                    Also it only affects the schema of the generated data, not 
whether any values will be null.
+                    If this property is true, see 'Null Value Percentage' to 
set the probability that any generated field will be null.
+                    """)
             .allowableValues("true", "false")
             .defaultValue("true")
             .required(true)
             .build();
     static final PropertyDescriptor NULL_PERCENTAGE = new 
PropertyDescriptor.Builder()
             .name("Null Value Percentage")
-            .description("The percent probability (0-100%) that a generated 
value for any nullable field will be null. Set this property to zero to have no 
null values, or 100 to have all " +
-                    "null values.")
+            .description("""
+                    The percent probability (0-100%) that a generated value 
for any nullable field will be null.
+                    Set this property to zero to have no null values, or 100 
to have all null values.
+                    """)
             .addValidator(StandardValidators.createLongValidator(0L, 100L, 
true))
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .required(true)
@@ -141,18 +153,34 @@ public class GenerateRecord extends AbstractProcessor {
 
     static final PropertyDescriptor SCHEMA_TEXT = new 
PropertyDescriptor.Builder()
             .name("Schema Text")
-            .description("The text of an Avro-formatted Schema used to 
generate record data. If this property is set, any user-defined properties are 
ignored.")
+            .description("""
+                    The text of an Avro-formatted Schema used to generate 
record data.
+                    Only one of Schema Text, Predefined Schema, or 
user-defined dynamic properties should be configured.
+                    """)
             .addValidator(new AvroSchemaValidator())
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .required(false)
             .build();
 
+    static final PropertyDescriptor PREDEFINED_SCHEMA = new 
PropertyDescriptor.Builder()
+            .name("Predefined Schema")
+            .description("""
+                    Select a predefined schema template for quick record 
generation. Predefined schemas provide ready-to-use
+                    templates with multiple fields covering various data types 
including nested records, arrays, maps, dates, timestamps, etc.
+                    Only one of Schema Text, Predefined Schema, or 
user-defined dynamic properties should be configured.
+                    Note: This feature is intended for quick testing purposes 
only. Predefined schemas may change between NiFi versions.
+                    """)
+            .allowableValues(PredefinedRecordSchema.class)
+            .required(false)
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             RECORD_WRITER,
             NUM_RECORDS,
             NULLABLE_FIELDS,
             NULL_PERCENTAGE,
-            SCHEMA_TEXT
+            SCHEMA_TEXT,
+            PREDEFINED_SCHEMA
     );
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -188,6 +216,46 @@ public class GenerateRecord extends AbstractProcessor {
         return RELATIONSHIPS;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final boolean hasSchemaText = 
validationContext.getProperty(SCHEMA_TEXT).isSet();
+        final boolean hasPredefinedSchema = 
validationContext.getProperty(PREDEFINED_SCHEMA).isSet();
+        final boolean hasDynamicProperties = 
validationContext.getProperties().keySet().stream()
+                .anyMatch(PropertyDescriptor::isDynamic);
+
+        int configuredCount = 0;
+        if (hasSchemaText) {
+            configuredCount++;
+        }
+        if (hasPredefinedSchema) {
+            configuredCount++;
+        }
+        if (hasDynamicProperties) {
+            configuredCount++;
+        }
+
+        if (configuredCount == 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Schema Configuration")
+                    .valid(false)
+                    .explanation("At least one schema configuration must be 
provided: Schema Text, Predefined Schema, or user-defined dynamic properties")
+                    .build());
+        } else if (configuredCount > 1) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Schema Configuration")
+                    .valid(false)
+                    .explanation("Only one schema configuration should be 
provided. Found multiple configurations: "
+                            + (hasSchemaText ? "Schema Text, " : "")
+                            + (hasPredefinedSchema ? "Predefined Schema, " : 
"")
+                            + (hasDynamicProperties ? "Dynamic Properties" : 
""))
+                    .build());
+        }
+
+        return results;
+    }
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         // Force the en-US Locale for more predictable results
@@ -198,6 +266,8 @@ public class GenerateRecord extends AbstractProcessor {
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
 
         final String schemaText = 
context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue();
+        final String predefinedSchemaName = 
context.getProperty(PREDEFINED_SCHEMA).getValue();
+        final PredefinedRecordSchema predefinedSchema = 
PredefinedRecordSchema.fromName(predefinedSchemaName);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         final int numRecords = 
context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger();
         final boolean nullable = 
context.getProperty(NULLABLE_FIELDS).asBoolean();
@@ -210,16 +280,21 @@ public class GenerateRecord extends AbstractProcessor {
         try {
             flowFile = session.write(flowFile, out -> {
                 final RecordSchema recordSchema;
-                final boolean usingSchema;
+                final SchemaSource schemaSource;
                 if (StringUtils.isNotEmpty(schemaText)) {
+                    // Schema Text takes highest precedence
                     final Schema avroSchema = new 
Schema.Parser().parse(schemaText);
                     recordSchema = AvroTypeUtil.createSchema(avroSchema);
-                    usingSchema = true;
+                    schemaSource = SchemaSource.SCHEMA_TEXT;
+                } else if (predefinedSchema != null) {
+                    // Predefined schema takes second precedence
+                    recordSchema = predefinedSchema.getSchema(nullable);
+                    schemaSource = SchemaSource.PREDEFINED;
                 } else {
                     // Generate RecordSchema from user-defined properties
                     final Map<String, String> fields = getFields(context);
                     recordSchema = generateRecordSchema(fields, nullable);
-                    usingSchema = false;
+                    schemaSource = SchemaSource.DYNAMIC_PROPERTIES;
                 }
                 try {
                     final RecordSchema writeSchema = 
writerFactory.getSchema(attributes, recordSchema);
@@ -227,29 +302,35 @@ public class GenerateRecord extends AbstractProcessor {
                         writer.beginRecordSet();
 
                         Record record;
-                        List<RecordField> writeFieldNames = 
writeSchema.getFields();
-                        Map<String, Object> recordEntries = new HashMap<>();
                         for (int i = 0; i < numRecords; i++) {
-                            for (RecordField writeRecordField : 
writeFieldNames) {
-                                final String writeFieldName = 
writeRecordField.getFieldName();
-                                final Object writeFieldValue;
-                                if (usingSchema) {
-                                    writeFieldValue = 
generateValueFromRecordField(writeRecordField, faker, nullPercentage);
-                                } else {
-                                    final boolean nullValue =
-                                            nullPercentage > 0 && 
faker.number().numberBetween(0, 100) <= nullPercentage;
-
-                                    if (nullValue) {
-                                        writeFieldValue = null;
+                            if (schemaSource == SchemaSource.PREDEFINED) {
+                                // Use the predefined schema's optimized value 
generation
+                                final Map<String, Object> recordEntries = 
predefinedSchema.generateValues(faker, recordSchema, nullPercentage);
+                                record = new MapRecord(recordSchema, 
recordEntries);
+                            } else {
+                                // Use original logic for Schema Text or 
dynamic properties
+                                List<RecordField> writeFieldNames = 
writeSchema.getFields();
+                                Map<String, Object> recordEntries = new 
HashMap<>();
+                                for (RecordField writeRecordField : 
writeFieldNames) {
+                                    final String writeFieldName = 
writeRecordField.getFieldName();
+                                    final Object writeFieldValue;
+                                    if (schemaSource == 
SchemaSource.SCHEMA_TEXT) {
+                                        writeFieldValue = 
generateValueFromRecordField(writeRecordField, faker, nullPercentage);
                                     } else {
-                                        final String propertyValue = 
context.getProperty(writeFieldName).getValue();
-                                        writeFieldValue = 
FakerUtils.getFakeData(propertyValue, faker);
+                                        final boolean nullValue =
+                                                nullPercentage > 0 && 
faker.number().numberBetween(0, 100) <= nullPercentage;
+
+                                        if (nullValue) {
+                                            writeFieldValue = null;
+                                        } else {
+                                            final String propertyValue = 
context.getProperty(writeFieldName).getValue();
+                                            writeFieldValue = 
FakerUtils.getFakeData(propertyValue, faker);
+                                        }
                                     }
+                                    recordEntries.put(writeFieldName, 
writeFieldValue);
                                 }
-
-                                recordEntries.put(writeFieldName, 
writeFieldValue);
+                                record = new MapRecord(recordSchema, 
recordEntries);
                             }
-                            record = new MapRecord(recordSchema, 
recordEntries);
                             writer.write(record);
                         }
 
@@ -403,4 +484,13 @@ public class GenerateRecord extends AbstractProcessor {
         }
         return new SimpleRecordSchema(recordFields);
     }
+
+    /**
+     * Enum to track which source is being used for the record schema.
+     */
+    private enum SchemaSource {
+        SCHEMA_TEXT,
+        PREDEFINED,
+        DYNAMIC_PROPERTIES
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
new file mode 100644
index 0000000000..965d9ce4e3
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.faker;
+
+import net.datafaker.Faker;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Predefined record schemas for the GenerateRecord processor.
+ * These schemas provide ready-to-use templates for generating fake data
+ * without requiring manual configuration of dynamic properties.
+ */
+public enum PredefinedRecordSchema implements DescribedValue {
+
+    PERSON("Person", "A person with name, contact information, and address 
(schema.org/Person)") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // PostalAddress fields per schema.org/PostalAddress
+            List<RecordField> addressFields = Arrays.asList(
+                    new RecordField("streetAddress", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressLocality", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressRegion", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("postalCode", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressCountry", 
RecordFieldType.STRING.getDataType(), nullable)
+            );
+            RecordSchema addressSchema = new SimpleRecordSchema(addressFields);
+
+            // Person fields per schema.org/Person
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("givenName", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("familyName", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("email", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("telephone", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("birthDate", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("age", RecordFieldType.INT.getDataType(), 
nullable),
+                    new RecordField("active", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("address", 
RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+            values.put("givenName", generateNullableValue(nullPercentage, 
faker, f -> f.name().firstName()));
+            values.put("familyName", generateNullableValue(nullPercentage, 
faker, f -> f.name().lastName()));
+            values.put("email", generateNullableValue(nullPercentage, faker, f 
-> f.internet().emailAddress()));
+            values.put("telephone", generateNullableValue(nullPercentage, 
faker, f -> f.phoneNumber().phoneNumber()));
+            values.put("birthDate", generateNullableValue(nullPercentage, 
faker, f -> {
+                LocalDate birthday = f.timeAndDate().birthday(18, 80);
+                return Date.valueOf(birthday);
+            }));
+            values.put("age", generateNullableValue(nullPercentage, faker, f 
-> f.number().numberBetween(18, 80)));
+            values.put("active", generateNullableValue(nullPercentage, faker, 
f -> f.bool().bool()));
+
+            // Generate nested address record (PostalAddress)
+            Map<String, Object> addressValues = new LinkedHashMap<>();
+            addressValues.put("streetAddress", 
generateNullableValue(nullPercentage, faker, f -> f.address().streetAddress()));
+            addressValues.put("addressLocality", 
generateNullableValue(nullPercentage, faker, f -> f.address().city()));
+            addressValues.put("addressRegion", 
generateNullableValue(nullPercentage, faker, f -> f.address().state()));
+            addressValues.put("postalCode", 
generateNullableValue(nullPercentage, faker, f -> f.address().zipCode()));
+            addressValues.put("addressCountry", 
generateNullableValue(nullPercentage, faker, f -> f.address().country()));
+
+            RecordSchema addressSchema = 
schema.getField("address").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                    ? ((RecordDataType) 
schema.getField("address").get().getDataType()).getChildSchema()
+                    : null;
+
+            if (addressSchema != null) {
+                values.put("address", generateNullableValue(nullPercentage, 
faker, f -> new MapRecord(addressSchema, addressValues)));
+            }
+
+            return values;
+        }
+    },
+
+    ORDER("Order", "An e-commerce order with line items, amounts, and 
timestamps (schema.org/Order)") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // OrderItem fields per schema.org/OrderItem
+            List<RecordField> orderedItemFields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("name", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("orderQuantity", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("price", 
RecordFieldType.DOUBLE.getDataType(), nullable)
+            );
+            RecordSchema orderedItemSchema = new 
SimpleRecordSchema(orderedItemFields);
+
+            // Order fields per schema.org/Order
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("orderNumber", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("customer", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("customerName", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("customerEmail", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("orderDate", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("orderTime", 
RecordFieldType.TIME.getDataType(), nullable),
+                    new RecordField("orderDelivery", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+                    new RecordField("totalPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable),
+                    new RecordField("priceCurrency", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("orderStatus", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("isGift", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("itemCount", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("orderedItem", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderedItemSchema)),
 nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            values.put("orderNumber", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+            values.put("customer", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+            values.put("customerName", generateNullableValue(nullPercentage, 
faker, f -> f.name().fullName()));
+            values.put("customerEmail", generateNullableValue(nullPercentage, 
faker, f -> f.internet().emailAddress()));
+
+            Instant orderInstant = faker.timeAndDate().past(365, 
TimeUnit.DAYS);
+            values.put("orderDate", generateNullableValue(nullPercentage, 
faker, f -> new Date(orderInstant.toEpochMilli())));
+            values.put("orderTime", generateNullableValue(nullPercentage, 
faker, f -> new Time(orderInstant.toEpochMilli())));
+            values.put("orderDelivery", generateNullableValue(nullPercentage, 
faker, f -> new Timestamp(orderInstant.toEpochMilli())));
+
+            // OrderStatus values per schema.org/OrderStatus
+            String[] statuses = {"OrderCancelled", "OrderDelivered", 
"OrderInTransit", "OrderPaymentDue",
+                    "OrderPickupAvailable", "OrderProblem", "OrderProcessing", 
"OrderReturned"};
+            String status = statuses[faker.number().numberBetween(0, 
statuses.length)];
+            values.put("orderStatus", generateNullableValue(nullPercentage, 
faker, f -> status));
+            values.put("isGift", generateNullableValue(nullPercentage, faker, 
f -> f.bool().bool()));
+
+            // Use Faker's money provider for currency codes
+            values.put("priceCurrency", generateNullableValue(nullPercentage, 
faker, f -> f.money().currencyCode()));
+
+            // Generate ordered items
+            int itemCount = faker.number().numberBetween(1, 5);
+            values.put("itemCount", generateNullableValue(nullPercentage, 
faker, f -> itemCount));
+
+            RecordSchema orderedItemSchema = null;
+            if 
(schema.getField("orderedItem").get().getDataType().getFieldType() == 
RecordFieldType.ARRAY) {
+                DataType elementType = ((ArrayDataType) 
schema.getField("orderedItem").get().getDataType()).getElementType();
+                if (elementType.getFieldType() == RecordFieldType.RECORD) {
+                    orderedItemSchema = ((RecordDataType) 
elementType).getChildSchema();
+                }
+            }
+
+            double totalPrice = 0.0;
+            Object[] orderedItems = new Object[itemCount];
+            for (int i = 0; i < itemCount; i++) {
+                Map<String, Object> orderedItemValues = new LinkedHashMap<>();
+                orderedItemValues.put("identifier", "PRD-" + 
faker.number().digits(8));
+                orderedItemValues.put("name", faker.commerce().productName());
+                int quantity = faker.number().numberBetween(1, 10);
+                double price = faker.number().randomDouble(2, 10, 500);
+                orderedItemValues.put("orderQuantity", quantity);
+                orderedItemValues.put("price", price);
+                totalPrice += quantity * price;
+
+                if (orderedItemSchema != null) {
+                    orderedItems[i] = new MapRecord(orderedItemSchema, 
orderedItemValues);
+                }
+            }
+            values.put("orderedItem", generateNullableValue(nullPercentage, 
faker, f -> orderedItems));
+            final double finalTotal = totalPrice;
+            values.put("totalPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(finalTotal).setScale(2, RoundingMode.HALF_UP)));
+
+            return values;
+        }
+    },
+
+    EVENT("Event", "A timestamped event with metadata and keywords 
(schema.org/Event)") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // Event fields per schema.org/Event
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("additionalType", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("startDate", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("startTime", 
RecordFieldType.TIME.getDataType(), nullable),
+                    new RecordField("endDate", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+                    new RecordField("organizer", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("eventStatus", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("description", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("isAccessibleForFree", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("attendeeCount", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("duration", 
RecordFieldType.LONG.getDataType(), nullable),
+                    new RecordField("keywords", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), 
nullable),
+                    new RecordField("additionalProperty", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), 
nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+
+            // Use Faker's hacker provider for event type naming
+            values.put("additionalType", generateNullableValue(nullPercentage, 
faker, f ->
+                    f.hacker().verb().toUpperCase() + "_" + 
f.hacker().noun().toUpperCase()));
+
+            Instant eventInstant = faker.timeAndDate().past(30, TimeUnit.DAYS);
+            values.put("startDate", generateNullableValue(nullPercentage, 
faker, f -> new Date(eventInstant.toEpochMilli())));
+            values.put("startTime", generateNullableValue(nullPercentage, 
faker, f -> new Time(eventInstant.toEpochMilli())));
+            values.put("endDate", generateNullableValue(nullPercentage, faker, 
f -> new Timestamp(eventInstant.toEpochMilli())));
+
+            // Use Faker's app provider for organizer names
+            values.put("organizer", generateNullableValue(nullPercentage, 
faker, f -> f.app().name().toLowerCase().replace(" ", "-")));
+
+            // EventStatus values per schema.org/EventStatusType
+            String[] statuses = {"EventCancelled", "EventMovedOnline", 
"EventPostponed", "EventRescheduled", "EventScheduled"};
+            values.put("eventStatus", generateNullableValue(nullPercentage, 
faker, f -> statuses[f.number().numberBetween(0, statuses.length)]));
+            values.put("description", generateNullableValue(nullPercentage, 
faker, f -> f.lorem().sentence()));
+            values.put("isAccessibleForFree", 
generateNullableValue(nullPercentage, faker, f -> f.bool().bool()));
+            values.put("attendeeCount", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(0, 5)));
+            values.put("duration", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(1L, 10000L)));
+
+            // Generate keywords array using Faker's marketing buzzwords
+            int keywordCount = faker.number().numberBetween(1, 4);
+            String[] keywords = new String[keywordCount];
+            for (int i = 0; i < keywordCount; i++) {
+                keywords[i] = faker.marketing().buzzwords().toLowerCase();
+            }
+            values.put("keywords", generateNullableValue(nullPercentage, 
faker, f -> keywords));
+
+            // Generate additionalProperty map using Faker providers
+            Map<String, String> additionalProperty = new HashMap<>();
+            additionalProperty.put("version", faker.app().version());
+            additionalProperty.put("environment", 
faker.options().option("dev", "staging", "prod"));
+            // Use Faker's AWS provider for region
+            additionalProperty.put("region", faker.aws().region());
+            additionalProperty.put("correlationId", 
UUID.randomUUID().toString());
+            values.put("additionalProperty", 
generateNullableValue(nullPercentage, faker, f -> additionalProperty));
+
+            return values;
+        }
+    },
+
+    SENSOR("Sensor", "An IoT sensor reading with geo coordinates and 
measurements") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // GeoCoordinates fields per schema.org/GeoCoordinates
+            List<RecordField> geoFields = Arrays.asList(
+                    new RecordField("latitude", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("longitude", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("elevation", 
RecordFieldType.DOUBLE.getDataType(), nullable)
+            );
+            RecordSchema geoSchema = new SimpleRecordSchema(geoFields);
+
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("additionalType", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("manufacturer", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("dateCreated", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+                    new RecordField("temperature", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("humidity", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("pressure", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("batteryLevel", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("signalStrength", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("isActive", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("geo", 
RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            // Use Faker's device provider for serial number
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> f.device().serial()));
+
+            // Use Faker's device provider for platform/type
+            values.put("additionalType", generateNullableValue(nullPercentage, 
faker, f -> f.device().platform()));
+
+            // Use Faker's device provider for manufacturer
+            values.put("manufacturer", generateNullableValue(nullPercentage, 
faker, f -> f.device().manufacturer()));
+
+            values.put("dateCreated", generateNullableValue(nullPercentage, 
faker, f -> new Timestamp(System.currentTimeMillis() - 
f.number().numberBetween(0, 3600000))));
+            values.put("temperature", generateNullableValue(nullPercentage, 
faker, f -> f.number().randomDouble(2, -20, 45)));
+            values.put("humidity", generateNullableValue(nullPercentage, 
faker, f -> f.number().randomDouble(2, 0, 100)));
+            values.put("pressure", generateNullableValue(nullPercentage, 
faker, f -> f.number().randomDouble(2, 980, 1050)));
+            values.put("batteryLevel", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(0, 100)));
+            values.put("signalStrength", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(-100, -30)));
+            values.put("isActive", generateNullableValue(nullPercentage, 
faker, f -> f.bool().bool()));
+
+            // Generate nested geo record (GeoCoordinates)
+            Map<String, Object> geoValues = new LinkedHashMap<>();
+            geoValues.put("latitude", faker.number().randomDouble(6, -90, 90));
+            geoValues.put("longitude", faker.number().randomDouble(6, -180, 
180));
+            geoValues.put("elevation", faker.number().randomDouble(2, 0, 
3000));
+
+            RecordSchema geoSchema = 
schema.getField("geo").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                    ? ((RecordDataType) 
schema.getField("geo").get().getDataType()).getChildSchema()
+                    : null;
+
+            if (geoSchema != null) {
+                values.put("geo", generateNullableValue(nullPercentage, faker, 
f -> new MapRecord(geoSchema, geoValues)));
+            }
+
+            return values;
+        }
+    },
+
+    PRODUCT("Product", "A product catalog entry with pricing and inventory 
(schema.org/Product)") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // QuantitativeValue for dimensions per 
schema.org/QuantitativeValue
+            List<RecordField> dimensionFields = Arrays.asList(
+                    new RecordField("depth", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("width", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("height", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("weight", 
RecordFieldType.DOUBLE.getDataType(), nullable)
+            );
+            RecordSchema dimensionSchema = new 
SimpleRecordSchema(dimensionFields);
+
+            // Product fields per schema.org/Product
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("sku", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("name", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("description", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("category", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("brand", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("price", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable),
+                    new RecordField("priceCurrency", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("availability", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("inventoryLevel", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("ratingValue", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("reviewCount", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("dateCreated", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("dateModified", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+                    new RecordField("keywords", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), 
nullable),
+                    new RecordField("additionalProperty", 
RecordFieldType.RECORD.getRecordDataType(dimensionSchema), nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+            values.put("sku", generateNullableValue(nullPercentage, faker, f 
-> "SKU-" + f.number().digits(8)));
+            values.put("name", generateNullableValue(nullPercentage, faker, f 
-> f.commerce().productName()));
+            values.put("description", generateNullableValue(nullPercentage, 
faker, f -> f.lorem().paragraph()));
+            values.put("category", generateNullableValue(nullPercentage, 
faker, f -> f.commerce().department()));
+            // Use Faker's commerce provider for brand
+            values.put("brand", generateNullableValue(nullPercentage, faker, f 
-> f.commerce().brand()));
+            values.put("price", generateNullableValue(nullPercentage, faker, f 
-> BigDecimal.valueOf(f.number().randomDouble(2, 5, 2000)).setScale(2, 
RoundingMode.HALF_UP)));
+
+            // Use Faker's money provider for currency codes
+            values.put("priceCurrency", generateNullableValue(nullPercentage, 
faker, f -> f.money().currencyCode()));
+
+            int inventoryLevel = faker.number().numberBetween(0, 500);
+            values.put("availability", generateNullableValue(nullPercentage, 
faker, f -> inventoryLevel > 0));
+            values.put("inventoryLevel", generateNullableValue(nullPercentage, 
faker, f -> inventoryLevel));
+            values.put("ratingValue", generateNullableValue(nullPercentage, 
faker, f -> f.number().randomDouble(1, 1, 5)));
+            values.put("reviewCount", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(0, 5000)));
+            values.put("dateCreated", generateNullableValue(nullPercentage, 
faker, f -> new Date(f.timeAndDate().past(365, TimeUnit.DAYS).toEpochMilli())));
+            values.put("dateModified", generateNullableValue(nullPercentage, 
faker, f -> new Timestamp(f.timeAndDate().past(30, 
TimeUnit.DAYS).toEpochMilli())));
+
+            // Generate keywords array using Faker's marketing buzzwords
+            int keywordCount = faker.number().numberBetween(0, 4);
+            String[] keywords = new String[keywordCount];
+            for (int i = 0; i < keywordCount; i++) {
+                keywords[i] = faker.marketing().buzzwords().toLowerCase();
+            }
+            values.put("keywords", generateNullableValue(nullPercentage, 
faker, f -> keywords));
+
+            // Generate nested additionalProperty record (dimensions)
+            Map<String, Object> dimensionValues = new LinkedHashMap<>();
+            dimensionValues.put("depth", faker.number().randomDouble(2, 1, 
100));
+            dimensionValues.put("width", faker.number().randomDouble(2, 1, 
100));
+            dimensionValues.put("height", faker.number().randomDouble(2, 1, 
100));
+            dimensionValues.put("weight", faker.number().randomDouble(2, 1, 
50));
+
+            RecordSchema dimensionSchema = 
schema.getField("additionalProperty").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                    ? ((RecordDataType) 
schema.getField("additionalProperty").get().getDataType()).getChildSchema()
+                    : null;
+
+            if (dimensionSchema != null) {
+                values.put("additionalProperty", 
generateNullableValue(nullPercentage, faker, f -> new 
MapRecord(dimensionSchema, dimensionValues)));
+            }
+
+            return values;
+        }
+    },
+
+    STOCK_TRADE("Stock Trade", "A stock market trade with pricing and volume") 
{
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // Using schema.org naming conventions where applicable
+            List<RecordField> fields = Arrays.asList(
+                    new RecordField("identifier", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("tickerSymbol", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("name", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("exchange", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("actionType", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("dateCreated", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+                    new RecordField("price", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+                    new RecordField("orderQuantity", 
RecordFieldType.LONG.getDataType(), nullable),
+                    new RecordField("totalPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(16, 2), nullable),
+                    new RecordField("priceCurrency", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("bidPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+                    new RecordField("askPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+                    new RecordField("highPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+                    new RecordField("lowPrice", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+                    new RecordField("marketCap", 
RecordFieldType.LONG.getDataType(), nullable),
+                    new RecordField("isSettled", 
RecordFieldType.BOOLEAN.getDataType(), nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+
+            // Use Faker's stock provider for symbols (randomly choose between 
NASDAQ and NYSE)
+            values.put("tickerSymbol", generateNullableValue(nullPercentage, 
faker, f ->
+                    f.bool().bool() ? f.stock().nsdqSymbol() : 
f.stock().nyseSymbol()));
+            // Use Faker's company provider for company names
+            values.put("name", generateNullableValue(nullPercentage, faker, f 
-> f.company().name()));
+
+            // Use Faker's stock provider for exchanges
+            values.put("exchange", generateNullableValue(nullPercentage, 
faker, f -> f.stock().exchanges()));
+
+            // Trade types are fundamental financial terms (BUY/SELL)
+            String[] actionTypes = {"BuyAction", "SellAction"};
+            values.put("actionType", generateNullableValue(nullPercentage, 
faker, f -> actionTypes[f.number().numberBetween(0, actionTypes.length)]));
+            values.put("dateCreated", generateNullableValue(nullPercentage, 
faker, f -> new Timestamp(System.currentTimeMillis() - 
f.number().numberBetween(0, 86400000))));
+
+            double price = faker.number().randomDouble(4, 10, 3000);
+            long orderQuantity = faker.number().numberBetween(1, 10000);
+            values.put("price", generateNullableValue(nullPercentage, faker, f 
-> BigDecimal.valueOf(price).setScale(4, RoundingMode.HALF_UP)));
+            values.put("orderQuantity", generateNullableValue(nullPercentage, 
faker, f -> orderQuantity));
+            values.put("totalPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(price * orderQuantity).setScale(2, 
RoundingMode.HALF_UP)));
+            // Use Faker's money provider for currency codes
+            values.put("priceCurrency", generateNullableValue(nullPercentage, 
faker, f -> f.money().currencyCode()));
+            values.put("bidPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(price * 0.999).setScale(4, 
RoundingMode.HALF_UP)));
+            values.put("askPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(price * 1.001).setScale(4, 
RoundingMode.HALF_UP)));
+            values.put("highPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(price * 1.5).setScale(4, RoundingMode.HALF_UP)));
+            values.put("lowPrice", generateNullableValue(nullPercentage, 
faker, f -> BigDecimal.valueOf(price * 0.6).setScale(4, RoundingMode.HALF_UP)));
+            values.put("marketCap", generateNullableValue(nullPercentage, 
faker, f -> f.number().numberBetween(1_000_000_000L, 3_000_000_000_000L)));
+            values.put("isSettled", generateNullableValue(nullPercentage, 
faker, f -> f.bool().bool()));
+
+            return values;
+        }
+    },
+
+    COMPLETE_EXAMPLE("Complete Example", "A comprehensive schema demonstrating 
all supported data types including nested records, arrays, and maps") {
+        @Override
+        public RecordSchema getSchema(boolean nullable) {
+            // GeoCoordinates per schema.org/GeoCoordinates
+            List<RecordField> geoFields = Arrays.asList(
+                    new RecordField("latitude", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("longitude", 
RecordFieldType.DOUBLE.getDataType(), nullable)
+            );
+            RecordSchema geoSchema = new SimpleRecordSchema(geoFields);
+
+            // PostalAddress per schema.org/PostalAddress with nested geo
+            List<RecordField> addressFields = Arrays.asList(
+                    new RecordField("streetAddress", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressLocality", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressRegion", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("postalCode", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("addressCountry", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("geo", 
RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable)
+            );
+            RecordSchema addressSchema = new SimpleRecordSchema(addressFields);
+
+            // Person per schema.org/Person with nested address
+            List<RecordField> personFields = Arrays.asList(
+                    new RecordField("givenName", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("familyName", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("email", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("age", RecordFieldType.INT.getDataType(), 
nullable),
+                    new RecordField("verified", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("address", 
RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable)
+            );
+            RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+            // Order per schema.org/Order for array of records
+            List<RecordField> orderFields = Arrays.asList(
+                    new RecordField("orderNumber", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("totalPrice", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("priceCurrency", 
RecordFieldType.STRING.getDataType(), nullable),
+                    new RecordField("orderDate", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("isGift", 
RecordFieldType.BOOLEAN.getDataType(), nullable)
+            );
+            RecordSchema orderSchema = new SimpleRecordSchema(orderFields);
+
+            // Main schema with all types using schema.org naming
+            List<RecordField> fields = Arrays.asList(
+                    // Basic types
+                    new RecordField("identifier", 
RecordFieldType.UUID.getDataType(), nullable),
+                    new RecordField("isActive", 
RecordFieldType.BOOLEAN.getDataType(), nullable),
+                    new RecordField("score", 
RecordFieldType.INT.getDataType(), nullable),
+                    new RecordField("count", 
RecordFieldType.LONG.getDataType(), nullable),
+                    new RecordField("ratingValue", 
RecordFieldType.DOUBLE.getDataType(), nullable),
+                    new RecordField("price", 
RecordFieldType.FLOAT.getDataType(), nullable),
+                    new RecordField("balance", 
RecordFieldType.DECIMAL.getDecimalDataType(12, 2), nullable),
+                    new RecordField("initial", 
RecordFieldType.CHAR.getDataType(), nullable),
+                    new RecordField("flags", 
RecordFieldType.BYTE.getDataType(), nullable),
+                    new RecordField("position", 
RecordFieldType.SHORT.getDataType(), nullable),
+
+                    // Date/Time types per schema.org
+                    new RecordField("dateCreated", 
RecordFieldType.DATE.getDataType(), nullable),
+                    new RecordField("lastLogin", 
RecordFieldType.TIME.getDataType(), nullable),
+                    new RecordField("dateModified", 
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+
+                    // Complex types
+                    new RecordField("keywords", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), 
nullable),
+                    new RecordField("scores", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()), 
nullable),
+                    new RecordField("additionalProperty", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), 
nullable),
+                    new RecordField("person", 
RecordFieldType.RECORD.getRecordDataType(personSchema), nullable),
+                    new RecordField("orderedItem", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderSchema)),
 nullable)
+            );
+            return new SimpleRecordSchema(fields);
+        }
+
+        @Override
+        public Map<String, Object> generateValues(Faker faker, RecordSchema 
schema, int nullPercentage) {
+            Map<String, Object> values = new LinkedHashMap<>();
+
+            // Basic types
+            values.put("identifier", generateNullableValue(nullPercentage, 
faker, f -> UUID.randomUUID()));
+            values.put("isActive", generateNullableValue(nullPercentage, 
faker, f -> f.bool().bool()));
+            values.put("score", generateNullableValue(nullPercentage, faker, f 
-> f.number().numberBetween(0, 100)));
+            values.put("count", generateNullableValue(nullPercentage, faker, f 
-> f.number().numberBetween(0L, 1_000_000L)));
+            values.put("ratingValue", generateNullableValue(nullPercentage, 
faker, f -> f.number().randomDouble(2, 0, 5)));
+            values.put("price", generateNullableValue(nullPercentage, faker, f 
-> (float) f.number().randomDouble(2, 1, 1000)));
+            values.put("balance", generateNullableValue(nullPercentage, faker, 
f -> BigDecimal.valueOf(f.number().randomDouble(2, -10000, 50000)).setScale(2, 
RoundingMode.HALF_UP)));
+            values.put("initial", generateNullableValue(nullPercentage, faker, 
f -> (char) ('A' + f.number().numberBetween(0, 26))));
+            values.put("flags", generateNullableValue(nullPercentage, faker, f 
-> (byte) f.number().numberBetween(0, 127)));
+            values.put("position", generateNullableValue(nullPercentage, 
faker, f -> (short) f.number().numberBetween(1, 1000)));
+
+            // Date/Time types per schema.org
+            Instant pastInstant = faker.timeAndDate().past(365, TimeUnit.DAYS);
+            values.put("dateCreated", generateNullableValue(nullPercentage, 
faker, f -> new Date(pastInstant.toEpochMilli())));
+            values.put("lastLogin", generateNullableValue(nullPercentage, 
faker, f -> new Time(f.timeAndDate().past(1, TimeUnit.DAYS).toEpochMilli())));
+            values.put("dateModified", generateNullableValue(nullPercentage, 
faker, f -> new Timestamp(f.timeAndDate().past(7, 
TimeUnit.DAYS).toEpochMilli())));
+
+            // Array of strings (keywords) using Faker's word provider
+            int keywordCount = faker.number().numberBetween(1, 5);
+            String[] keywords = new String[keywordCount];
+            for (int i = 0; i < keywordCount; i++) {
+                keywords[i] = faker.word().adjective();
+            }
+            values.put("keywords", generateNullableValue(nullPercentage, 
faker, f -> keywords));
+
+            // Array of integers
+            int scoreCount = faker.number().numberBetween(3, 8);
+            Integer[] scores = new Integer[scoreCount];
+            for (int i = 0; i < scoreCount; i++) {
+                scores[i] = faker.number().numberBetween(50, 100);
+            }
+            values.put("scores", generateNullableValue(nullPercentage, faker, 
f -> scores));
+
+            // Map (additionalProperty) using Faker providers
+            Map<String, String> additionalProperty = new HashMap<>();
+            additionalProperty.put("source", 
faker.app().name().toLowerCase().replace(" ", "-"));
+            additionalProperty.put("version", faker.app().version());
+            additionalProperty.put("environment", 
faker.options().option("dev", "staging", "prod"));
+            // Use Faker's AWS provider for region
+            additionalProperty.put("region", faker.aws().region());
+            values.put("additionalProperty", 
generateNullableValue(nullPercentage, faker, f -> additionalProperty));
+
+            // Nested person record (3 levels deep: person -> address -> geo)
+            RecordSchema personSchema = 
schema.getField("person").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                    ? ((RecordDataType) 
schema.getField("person").get().getDataType()).getChildSchema()
+                    : null;
+
+            if (personSchema != null) {
+                Map<String, Object> personValues = new LinkedHashMap<>();
+                personValues.put("givenName", faker.name().firstName());
+                personValues.put("familyName", faker.name().lastName());
+                personValues.put("email", faker.internet().emailAddress());
+                personValues.put("age", faker.number().numberBetween(18, 80));
+                personValues.put("verified", faker.bool().bool());
+
+                RecordSchema addressSchema = 
personSchema.getField("address").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                        ? ((RecordDataType) 
personSchema.getField("address").get().getDataType()).getChildSchema()
+                        : null;
+
+                if (addressSchema != null) {
+                    Map<String, Object> addressValues = new LinkedHashMap<>();
+                    addressValues.put("streetAddress", 
faker.address().streetAddress());
+                    addressValues.put("addressLocality", 
faker.address().city());
+                    addressValues.put("addressRegion", 
faker.address().state());
+                    addressValues.put("postalCode", faker.address().zipCode());
+                    addressValues.put("addressCountry", 
faker.address().country());
+
+                    RecordSchema geoSchema = 
addressSchema.getField("geo").get().getDataType().getFieldType() == 
RecordFieldType.RECORD
+                            ? ((RecordDataType) 
addressSchema.getField("geo").get().getDataType()).getChildSchema()
+                            : null;
+
+                    if (geoSchema != null) {
+                        Map<String, Object> geoValues = new LinkedHashMap<>();
+                        geoValues.put("latitude", 
faker.number().randomDouble(6, -90, 90));
+                        geoValues.put("longitude", 
faker.number().randomDouble(6, -180, 180));
+                        addressValues.put("geo", new MapRecord(geoSchema, 
geoValues));
+                    }
+
+                    personValues.put("address", new MapRecord(addressSchema, 
addressValues));
+                }
+
+                values.put("person", generateNullableValue(nullPercentage, 
faker, f -> new MapRecord(personSchema, personValues)));
+            }
+
+            // Array of order records (orderedItem)
+            RecordSchema orderSchema = null;
+            if 
(schema.getField("orderedItem").get().getDataType().getFieldType() == 
RecordFieldType.ARRAY) {
+                DataType elementType = ((ArrayDataType) 
schema.getField("orderedItem").get().getDataType()).getElementType();
+                if (elementType.getFieldType() == RecordFieldType.RECORD) {
+                    orderSchema = ((RecordDataType) 
elementType).getChildSchema();
+                }
+            }
+
+            if (orderSchema != null) {
+                final RecordSchema finalOrderSchema = orderSchema;
+                int orderCount = faker.number().numberBetween(1, 4);
+                Object[] orders = new Object[orderCount];
+                for (int i = 0; i < orderCount; i++) {
+                    Map<String, Object> orderValues = new LinkedHashMap<>();
+                    orderValues.put("orderNumber", "ORD-" + 
faker.number().digits(8));
+                    orderValues.put("totalPrice", 
faker.number().randomDouble(2, 10, 500));
+                    // Use Faker's money provider for currency codes
+                    orderValues.put("priceCurrency", 
faker.money().currencyCode());
+                    orderValues.put("orderDate", new 
Date(faker.timeAndDate().past(90, TimeUnit.DAYS).toEpochMilli()));
+                    orderValues.put("isGift", faker.bool().bool());
+                    orders[i] = new MapRecord(finalOrderSchema, orderValues);
+                }
+                values.put("orderedItem", 
generateNullableValue(nullPercentage, faker, f -> orders));
+            }
+
+            return values;
+        }
+    };
+
+    private final String displayName;
+    private final String description;
+
+    PredefinedRecordSchema(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    /**
+     * Get the record schema for this predefined schema type.
+     *
+     * @param nullable whether fields should be nullable
+     * @return the RecordSchema
+     */
+    public abstract RecordSchema getSchema(boolean nullable);
+
+    /**
+     * Generate random values for all fields in this schema.
+     *
+     * @param faker the Faker instance to use for generating random data
+     * @param schema the RecordSchema to generate values for
+     * @param nullPercentage the percentage chance (0-100) that nullable 
fields will be null
+     * @return a Map of field names to generated values
+     */
+    public abstract Map<String, Object> generateValues(Faker faker, 
RecordSchema schema, int nullPercentage);
+
+    /**
+     * Generate a record with random values.
+     *
+     * @param faker the Faker instance to use for generating random data
+     * @param nullable whether fields should be nullable
+     * @param nullPercentage the percentage chance (0-100) that nullable 
fields will be null
+     * @return a Record with generated values
+     */
+    public Record generateRecord(Faker faker, boolean nullable, int 
nullPercentage) {
+        RecordSchema schema = getSchema(nullable);
+        Map<String, Object> values = generateValues(faker, schema, 
nullPercentage);
+        return new MapRecord(schema, values);
+    }
+
+    /**
+     * Helper method to generate a value with a chance of being null.
+     */
+    protected static <T> T generateNullableValue(int nullPercentage, Faker 
faker, Function<Faker, T> generator) {
+        if (nullPercentage > 0 && faker.number().numberBetween(0, 100) < 
nullPercentage) {
+            return null;
+        }
+        return generator.apply(faker);
+    }
+
+    /**
+     * Get a predefined schema by name, or null if not found or empty.
+     *
+     * @param name the name of the predefined schema
+     * @return the PredefinedRecordSchema, or null if not found or name is 
null/empty
+     */
+    public static PredefinedRecordSchema fromName(String name) {
+        if (name == null || name.isEmpty()) {
+            return null;
+        }
+        try {
+            return valueOf(name);
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
index 399a394cd6..bd111cf856 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
@@ -23,6 +23,8 @@ import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.processors.standard.faker.FakerMethodHolder;
 import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
@@ -158,6 +160,9 @@ public class TestGenerateRecord {
             testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), 
((AllowableValue) field.get(processor)).getValue());
         }
 
+        // Add at least one dynamic property to satisfy schema configuration 
validation
+        testRunner.setProperty("testField", "Name.fullName");
+
         final Map<String, String> recordFields = 
processor.getFields(testRunner.getProcessContext());
         final RecordSchema outputSchema = 
processor.generateRecordSchema(recordFields, true);
         final MockRecordWriter recordWriter = new MockRecordWriter(null, true, 
-1, false, outputSchema);
@@ -301,4 +306,127 @@ public class TestGenerateRecord {
         final PropertyMigrationResult propertyMigrationResult = 
testRunner.migrateProperties();
         assertEquals(expectedRenamed, 
propertyMigrationResult.getPropertiesRenamed());
     }
+
+    @Test
+    public void testValidationFailsWithNoSchemaConfiguration() throws 
InitializationException {
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+        testRunner.addControllerService("record-writer", recordWriter);
+        testRunner.enableControllerService(recordWriter);
+        testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testValidationFailsWithMultipleSchemaConfigurations() throws 
Exception {
+        String schemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+        testRunner.addControllerService("record-writer", recordWriter);
+        testRunner.enableControllerService(recordWriter);
+        testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+        // Set both Schema Text and Predefined Schema - should be invalid
+        testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
+        testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, 
PredefinedRecordSchema.PERSON.name());
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testValidationFailsWithPredefinedSchemaAndDynamicProperties() 
throws Exception {
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+        testRunner.addControllerService("record-writer", recordWriter);
+        testRunner.enableControllerService(recordWriter);
+        testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+        // Set both Predefined Schema and dynamic property - should be invalid
+        testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, 
PredefinedRecordSchema.PERSON.name());
+        testRunner.setProperty("myField", "Address.fullAddress");
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testPredefinedSchemaPerson() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.PERSON, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaOrder() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.ORDER, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaEvent() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.EVENT, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaSensor() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaProduct() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaStockTrade() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5);
+    }
+
+    @Test
+    public void testPredefinedSchemaCompleteExample() throws Exception {
+        testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3);
+    }
+
+    private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, 
int numRecords) throws Exception {
+        final RecordSchema schema = predefinedSchema.getSchema(true);
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, true, 
-1, false, schema);
+        testRunner.addControllerService("record-writer", recordWriter);
+        testRunner.enableControllerService(recordWriter);
+
+        testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, 
predefinedSchema.name());
+        testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+        testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
+        testRunner.setProperty(GenerateRecord.NUM_RECORDS, 
String.valueOf(numRecords));
+
+        testRunner.assertValid();
+        testRunner.run();
+
+        testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst();
+
+        // Verify record count attribute
+        flowFile.assertAttributeEquals("record.count", 
String.valueOf(numRecords));
+    }
+
+    @Test
+    public void testPredefinedSchemaWithNullPercentage() throws Exception {
+        final RecordSchema schema = 
PredefinedRecordSchema.PERSON.getSchema(true);
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, true, 
-1, false, schema);
+        testRunner.addControllerService("record-writer", recordWriter);
+        testRunner.enableControllerService(recordWriter);
+
+        testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+        testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, 
PredefinedRecordSchema.PERSON.name());
+        testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+        testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100");
+        testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+        testRunner.assertValid();
+        testRunner.run();
+
+        testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst();
+
+        // Verify record count attribute
+        flowFile.assertAttributeEquals("record.count", "1");
+    }
 }

Reply via email to