This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ca768efa25 NIFI-15448 Add option for using Predefined Schemas in
GenerateRecord (#10752)
ca768efa25 is described below
commit ca768efa2540aed6bbf649a107d45424264e9cac
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Feb 1 03:58:24 2026 +0100
NIFI-15448 Add option for using Predefined Schemas in GenerateRecord
(#10752)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/standard/GenerateRecord.java | 150 +++-
.../standard/faker/PredefinedRecordSchema.java | 757 +++++++++++++++++++++
.../processors/standard/TestGenerateRecord.java | 128 ++++
3 files changed, 1005 insertions(+), 30 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
index d355ee58ae..818d7679c8 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
@@ -31,6 +31,8 @@ import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,6 +44,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -65,6 +68,7 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -84,9 +88,12 @@ import static
org.apache.nifi.processors.standard.faker.FakerUtils.DEFAULT_DATE_
@WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number
of records in the FlowFile"),
})
-@CapabilityDescription("This processor creates FlowFiles with records having
random value for the specified fields. GenerateRecord is useful " +
- "for testing, configuration, and simulation. It uses either
user-defined properties to define a record schema or a provided schema and
generates the specified number of records using " +
- "random data for the fields in the schema.")
+@CapabilityDescription("""
+ This processor creates FlowFiles with records having random value for
the specified fields. GenerateRecord is useful
+ for testing, configuration, and simulation. It uses one of three
methods to define a record schema: (1) a provided Avro Schema Text,
+ (2) a Predefined Schema template such as Person, Order, Event, Sensor,
Product, Stock Trade, or Complete Example covering all data types,
+ or (3) user-defined dynamic properties. The processor generates the
specified number of records using random data for the fields in the schema.
+ """)
@DynamicProperties({
@DynamicProperty(
name = "Field name in generated record",
@@ -122,16 +129,21 @@ public class GenerateRecord extends AbstractProcessor {
static final PropertyDescriptor NULLABLE_FIELDS = new
PropertyDescriptor.Builder()
.name("Nullable Fields")
- .description("Whether the generated fields will be nullable. Note
that this property is ignored if Schema Text is set. Also it only affects the
schema of the generated data, " +
- "not whether any values will be null. If this property is
true, see 'Null Value Percentage' to set the probability that any generated
field will be null.")
+ .description("""
+ Whether the generated fields will be nullable. Note that
this property is ignored if Schema Text is set.
+ Also it only affects the schema of the generated data, not
whether any values will be null.
+ If this property is true, see 'Null Value Percentage' to
set the probability that any generated field will be null.
+ """)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
static final PropertyDescriptor NULL_PERCENTAGE = new
PropertyDescriptor.Builder()
.name("Null Value Percentage")
- .description("The percent probability (0-100%) that a generated
value for any nullable field will be null. Set this property to zero to have no
null values, or 100 to have all " +
- "null values.")
+ .description("""
+ The percent probability (0-100%) that a generated value
for any nullable field will be null.
+ Set this property to zero to have no null values, or 100
to have all null values.
+ """)
.addValidator(StandardValidators.createLongValidator(0L, 100L,
true))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(true)
@@ -141,18 +153,34 @@ public class GenerateRecord extends AbstractProcessor {
static final PropertyDescriptor SCHEMA_TEXT = new
PropertyDescriptor.Builder()
.name("Schema Text")
- .description("The text of an Avro-formatted Schema used to
generate record data. If this property is set, any user-defined properties are
ignored.")
+ .description("""
+ The text of an Avro-formatted Schema used to generate
record data.
+ Only one of Schema Text, Predefined Schema, or
user-defined dynamic properties should be configured.
+ """)
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.build();
+ static final PropertyDescriptor PREDEFINED_SCHEMA = new
PropertyDescriptor.Builder()
+ .name("Predefined Schema")
+ .description("""
+ Select a predefined schema template for quick record
generation. Predefined schemas provide ready-to-use
+ templates with multiple fields covering various data types
including nested records, arrays, maps, dates, timestamps, etc.
+ Only one of Schema Text, Predefined Schema, or
user-defined dynamic properties should be configured.
+ Note: This feature is intended for quick testing purposes
only. Predefined schemas may change between NiFi versions.
+ """)
+ .allowableValues(PredefinedRecordSchema.class)
+ .required(false)
+ .build();
+
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
RECORD_WRITER,
NUM_RECORDS,
NULLABLE_FIELDS,
NULL_PERCENTAGE,
- SCHEMA_TEXT
+ SCHEMA_TEXT,
+ PREDEFINED_SCHEMA
);
static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -188,6 +216,46 @@ public class GenerateRecord extends AbstractProcessor {
return RELATIONSHIPS;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final boolean hasSchemaText =
validationContext.getProperty(SCHEMA_TEXT).isSet();
+ final boolean hasPredefinedSchema =
validationContext.getProperty(PREDEFINED_SCHEMA).isSet();
+ final boolean hasDynamicProperties =
validationContext.getProperties().keySet().stream()
+ .anyMatch(PropertyDescriptor::isDynamic);
+
+ int configuredCount = 0;
+ if (hasSchemaText) {
+ configuredCount++;
+ }
+ if (hasPredefinedSchema) {
+ configuredCount++;
+ }
+ if (hasDynamicProperties) {
+ configuredCount++;
+ }
+
+ if (configuredCount == 0) {
+ results.add(new ValidationResult.Builder()
+ .subject("Schema Configuration")
+ .valid(false)
+ .explanation("At least one schema configuration must be
provided: Schema Text, Predefined Schema, or user-defined dynamic properties")
+ .build());
+ } else if (configuredCount > 1) {
+ results.add(new ValidationResult.Builder()
+ .subject("Schema Configuration")
+ .valid(false)
+ .explanation("Only one schema configuration should be
provided. Found multiple configurations: "
+ + (hasSchemaText ? "Schema Text, " : "")
+ + (hasPredefinedSchema ? "Predefined Schema, " :
"")
+ + (hasDynamicProperties ? "Dynamic Properties" :
""))
+ .build());
+ }
+
+ return results;
+ }
+
@OnScheduled
public void onScheduled(final ProcessContext context) {
// Force the en-US Locale for more predictable results
@@ -198,6 +266,8 @@ public class GenerateRecord extends AbstractProcessor {
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
final String schemaText =
context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue();
+ final String predefinedSchemaName =
context.getProperty(PREDEFINED_SCHEMA).getValue();
+ final PredefinedRecordSchema predefinedSchema =
PredefinedRecordSchema.fromName(predefinedSchemaName);
final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final int numRecords =
context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger();
final boolean nullable =
context.getProperty(NULLABLE_FIELDS).asBoolean();
@@ -210,16 +280,21 @@ public class GenerateRecord extends AbstractProcessor {
try {
flowFile = session.write(flowFile, out -> {
final RecordSchema recordSchema;
- final boolean usingSchema;
+ final SchemaSource schemaSource;
if (StringUtils.isNotEmpty(schemaText)) {
+ // Schema Text takes highest precedence
final Schema avroSchema = new
Schema.Parser().parse(schemaText);
recordSchema = AvroTypeUtil.createSchema(avroSchema);
- usingSchema = true;
+ schemaSource = SchemaSource.SCHEMA_TEXT;
+ } else if (predefinedSchema != null) {
+ // Predefined schema takes second precedence
+ recordSchema = predefinedSchema.getSchema(nullable);
+ schemaSource = SchemaSource.PREDEFINED;
} else {
// Generate RecordSchema from user-defined properties
final Map<String, String> fields = getFields(context);
recordSchema = generateRecordSchema(fields, nullable);
- usingSchema = false;
+ schemaSource = SchemaSource.DYNAMIC_PROPERTIES;
}
try {
final RecordSchema writeSchema =
writerFactory.getSchema(attributes, recordSchema);
@@ -227,29 +302,35 @@ public class GenerateRecord extends AbstractProcessor {
writer.beginRecordSet();
Record record;
- List<RecordField> writeFieldNames =
writeSchema.getFields();
- Map<String, Object> recordEntries = new HashMap<>();
for (int i = 0; i < numRecords; i++) {
- for (RecordField writeRecordField :
writeFieldNames) {
- final String writeFieldName =
writeRecordField.getFieldName();
- final Object writeFieldValue;
- if (usingSchema) {
- writeFieldValue =
generateValueFromRecordField(writeRecordField, faker, nullPercentage);
- } else {
- final boolean nullValue =
- nullPercentage > 0 &&
faker.number().numberBetween(0, 100) <= nullPercentage;
-
- if (nullValue) {
- writeFieldValue = null;
+ if (schemaSource == SchemaSource.PREDEFINED) {
+ // Use the predefined schema's optimized value
generation
+ final Map<String, Object> recordEntries =
predefinedSchema.generateValues(faker, recordSchema, nullPercentage);
+ record = new MapRecord(recordSchema,
recordEntries);
+ } else {
+ // Use original logic for Schema Text or
dynamic properties
+ List<RecordField> writeFieldNames =
writeSchema.getFields();
+ Map<String, Object> recordEntries = new
HashMap<>();
+ for (RecordField writeRecordField :
writeFieldNames) {
+ final String writeFieldName =
writeRecordField.getFieldName();
+ final Object writeFieldValue;
+ if (schemaSource ==
SchemaSource.SCHEMA_TEXT) {
+ writeFieldValue =
generateValueFromRecordField(writeRecordField, faker, nullPercentage);
} else {
- final String propertyValue =
context.getProperty(writeFieldName).getValue();
- writeFieldValue =
FakerUtils.getFakeData(propertyValue, faker);
+ final boolean nullValue =
+ nullPercentage > 0 &&
faker.number().numberBetween(0, 100) <= nullPercentage;
+
+ if (nullValue) {
+ writeFieldValue = null;
+ } else {
+ final String propertyValue =
context.getProperty(writeFieldName).getValue();
+ writeFieldValue =
FakerUtils.getFakeData(propertyValue, faker);
+ }
}
+ recordEntries.put(writeFieldName,
writeFieldValue);
}
-
- recordEntries.put(writeFieldName,
writeFieldValue);
+ record = new MapRecord(recordSchema,
recordEntries);
}
- record = new MapRecord(recordSchema,
recordEntries);
writer.write(record);
}
@@ -403,4 +484,13 @@ public class GenerateRecord extends AbstractProcessor {
}
return new SimpleRecordSchema(recordFields);
}
+
+ /**
+ * Enum to track which source is being used for the record schema.
+ */
+ private enum SchemaSource {
+ SCHEMA_TEXT,
+ PREDEFINED,
+ DYNAMIC_PROPERTIES
+ }
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
new file mode 100644
index 0000000000..965d9ce4e3
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.faker;
+
+import net.datafaker.Faker;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Predefined record schemas for the GenerateRecord processor.
+ * These schemas provide ready-to-use templates for generating fake data
+ * without requiring manual configuration of dynamic properties.
+ */
+public enum PredefinedRecordSchema implements DescribedValue {
+
+ PERSON("Person", "A person with name, contact information, and address
(schema.org/Person)") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // PostalAddress fields per schema.org/PostalAddress
+ List<RecordField> addressFields = Arrays.asList(
+ new RecordField("streetAddress",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressLocality",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressRegion",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("postalCode",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressCountry",
RecordFieldType.STRING.getDataType(), nullable)
+ );
+ RecordSchema addressSchema = new SimpleRecordSchema(addressFields);
+
+ // Person fields per schema.org/Person
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("givenName",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("familyName",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("email",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("telephone",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("birthDate",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("age", RecordFieldType.INT.getDataType(),
nullable),
+ new RecordField("active",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("address",
RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+ values.put("givenName", generateNullableValue(nullPercentage,
faker, f -> f.name().firstName()));
+ values.put("familyName", generateNullableValue(nullPercentage,
faker, f -> f.name().lastName()));
+ values.put("email", generateNullableValue(nullPercentage, faker, f
-> f.internet().emailAddress()));
+ values.put("telephone", generateNullableValue(nullPercentage,
faker, f -> f.phoneNumber().phoneNumber()));
+ values.put("birthDate", generateNullableValue(nullPercentage,
faker, f -> {
+ LocalDate birthday = f.timeAndDate().birthday(18, 80);
+ return Date.valueOf(birthday);
+ }));
+ values.put("age", generateNullableValue(nullPercentage, faker, f
-> f.number().numberBetween(18, 80)));
+ values.put("active", generateNullableValue(nullPercentage, faker,
f -> f.bool().bool()));
+
+ // Generate nested address record (PostalAddress)
+ Map<String, Object> addressValues = new LinkedHashMap<>();
+ addressValues.put("streetAddress",
generateNullableValue(nullPercentage, faker, f -> f.address().streetAddress()));
+ addressValues.put("addressLocality",
generateNullableValue(nullPercentage, faker, f -> f.address().city()));
+ addressValues.put("addressRegion",
generateNullableValue(nullPercentage, faker, f -> f.address().state()));
+ addressValues.put("postalCode",
generateNullableValue(nullPercentage, faker, f -> f.address().zipCode()));
+ addressValues.put("addressCountry",
generateNullableValue(nullPercentage, faker, f -> f.address().country()));
+
+ RecordSchema addressSchema =
schema.getField("address").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
schema.getField("address").get().getDataType()).getChildSchema()
+ : null;
+
+ if (addressSchema != null) {
+ values.put("address", generateNullableValue(nullPercentage,
faker, f -> new MapRecord(addressSchema, addressValues)));
+ }
+
+ return values;
+ }
+ },
+
+ ORDER("Order", "An e-commerce order with line items, amounts, and
timestamps (schema.org/Order)") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // OrderItem fields per schema.org/OrderItem
+ List<RecordField> orderedItemFields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("name",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("orderQuantity",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("price",
RecordFieldType.DOUBLE.getDataType(), nullable)
+ );
+ RecordSchema orderedItemSchema = new
SimpleRecordSchema(orderedItemFields);
+
+ // Order fields per schema.org/Order
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("orderNumber",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("customer",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("customerName",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("customerEmail",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("orderDate",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("orderTime",
RecordFieldType.TIME.getDataType(), nullable),
+ new RecordField("orderDelivery",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+ new RecordField("totalPrice",
RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable),
+ new RecordField("priceCurrency",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("orderStatus",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("isGift",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("itemCount",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("orderedItem",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderedItemSchema)),
nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ values.put("orderNumber", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+ values.put("customer", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+ values.put("customerName", generateNullableValue(nullPercentage,
faker, f -> f.name().fullName()));
+ values.put("customerEmail", generateNullableValue(nullPercentage,
faker, f -> f.internet().emailAddress()));
+
+ Instant orderInstant = faker.timeAndDate().past(365,
TimeUnit.DAYS);
+ values.put("orderDate", generateNullableValue(nullPercentage,
faker, f -> new Date(orderInstant.toEpochMilli())));
+ values.put("orderTime", generateNullableValue(nullPercentage,
faker, f -> new Time(orderInstant.toEpochMilli())));
+ values.put("orderDelivery", generateNullableValue(nullPercentage,
faker, f -> new Timestamp(orderInstant.toEpochMilli())));
+
+ // OrderStatus values per schema.org/OrderStatus
+ String[] statuses = {"OrderCancelled", "OrderDelivered",
"OrderInTransit", "OrderPaymentDue",
+ "OrderPickupAvailable", "OrderProblem", "OrderProcessing",
"OrderReturned"};
+ String status = statuses[faker.number().numberBetween(0,
statuses.length)];
+ values.put("orderStatus", generateNullableValue(nullPercentage,
faker, f -> status));
+ values.put("isGift", generateNullableValue(nullPercentage, faker,
f -> f.bool().bool()));
+
+ // Use Faker's money provider for currency codes
+ values.put("priceCurrency", generateNullableValue(nullPercentage,
faker, f -> f.money().currencyCode()));
+
+ // Generate ordered items
+ int itemCount = faker.number().numberBetween(1, 5);
+ values.put("itemCount", generateNullableValue(nullPercentage,
faker, f -> itemCount));
+
+ RecordSchema orderedItemSchema = null;
+ if
(schema.getField("orderedItem").get().getDataType().getFieldType() ==
RecordFieldType.ARRAY) {
+ DataType elementType = ((ArrayDataType)
schema.getField("orderedItem").get().getDataType()).getElementType();
+ if (elementType.getFieldType() == RecordFieldType.RECORD) {
+ orderedItemSchema = ((RecordDataType)
elementType).getChildSchema();
+ }
+ }
+
+ double totalPrice = 0.0;
+ Object[] orderedItems = new Object[itemCount];
+ for (int i = 0; i < itemCount; i++) {
+ Map<String, Object> orderedItemValues = new LinkedHashMap<>();
+ orderedItemValues.put("identifier", "PRD-" +
faker.number().digits(8));
+ orderedItemValues.put("name", faker.commerce().productName());
+ int quantity = faker.number().numberBetween(1, 10);
+ double price = faker.number().randomDouble(2, 10, 500);
+ orderedItemValues.put("orderQuantity", quantity);
+ orderedItemValues.put("price", price);
+ totalPrice += quantity * price;
+
+ if (orderedItemSchema != null) {
+ orderedItems[i] = new MapRecord(orderedItemSchema,
orderedItemValues);
+ }
+ }
+ values.put("orderedItem", generateNullableValue(nullPercentage,
faker, f -> orderedItems));
+ final double finalTotal = totalPrice;
+ values.put("totalPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(finalTotal).setScale(2, RoundingMode.HALF_UP)));
+
+ return values;
+ }
+ },
+
+ EVENT("Event", "A timestamped event with metadata and keywords
(schema.org/Event)") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // Event fields per schema.org/Event
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("additionalType",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("startDate",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("startTime",
RecordFieldType.TIME.getDataType(), nullable),
+ new RecordField("endDate",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+ new RecordField("organizer",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("eventStatus",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("description",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("isAccessibleForFree",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("attendeeCount",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("duration",
RecordFieldType.LONG.getDataType(), nullable),
+ new RecordField("keywords",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
nullable),
+ new RecordField("additionalProperty",
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()),
nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+
+ // Use Faker's hacker provider for event type naming
+ values.put("additionalType", generateNullableValue(nullPercentage,
faker, f ->
+ f.hacker().verb().toUpperCase() + "_" +
f.hacker().noun().toUpperCase()));
+
+ Instant eventInstant = faker.timeAndDate().past(30, TimeUnit.DAYS);
+ values.put("startDate", generateNullableValue(nullPercentage,
faker, f -> new Date(eventInstant.toEpochMilli())));
+ values.put("startTime", generateNullableValue(nullPercentage,
faker, f -> new Time(eventInstant.toEpochMilli())));
+ values.put("endDate", generateNullableValue(nullPercentage, faker,
f -> new Timestamp(eventInstant.toEpochMilli())));
+
+ // Use Faker's app provider for organizer names
+ values.put("organizer", generateNullableValue(nullPercentage,
faker, f -> f.app().name().toLowerCase().replace(" ", "-")));
+
+ // EventStatus values per schema.org/EventStatusType
+ String[] statuses = {"EventCancelled", "EventMovedOnline",
"EventPostponed", "EventRescheduled", "EventScheduled"};
+ values.put("eventStatus", generateNullableValue(nullPercentage,
faker, f -> statuses[f.number().numberBetween(0, statuses.length)]));
+ values.put("description", generateNullableValue(nullPercentage,
faker, f -> f.lorem().sentence()));
+ values.put("isAccessibleForFree",
generateNullableValue(nullPercentage, faker, f -> f.bool().bool()));
+ values.put("attendeeCount", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(0, 5)));
+ values.put("duration", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(1L, 10000L)));
+
+ // Generate keywords array using Faker's marketing buzzwords
+ int keywordCount = faker.number().numberBetween(1, 4);
+ String[] keywords = new String[keywordCount];
+ for (int i = 0; i < keywordCount; i++) {
+ keywords[i] = faker.marketing().buzzwords().toLowerCase();
+ }
+ values.put("keywords", generateNullableValue(nullPercentage,
faker, f -> keywords));
+
+ // Generate additionalProperty map using Faker providers
+ Map<String, String> additionalProperty = new HashMap<>();
+ additionalProperty.put("version", faker.app().version());
+ additionalProperty.put("environment",
faker.options().option("dev", "staging", "prod"));
+ // Use Faker's AWS provider for region
+ additionalProperty.put("region", faker.aws().region());
+ additionalProperty.put("correlationId",
UUID.randomUUID().toString());
+ values.put("additionalProperty",
generateNullableValue(nullPercentage, faker, f -> additionalProperty));
+
+ return values;
+ }
+ },
+
+ SENSOR("Sensor", "An IoT sensor reading with geo coordinates and
measurements") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // GeoCoordinates fields per schema.org/GeoCoordinates
+ List<RecordField> geoFields = Arrays.asList(
+ new RecordField("latitude",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("longitude",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("elevation",
RecordFieldType.DOUBLE.getDataType(), nullable)
+ );
+ RecordSchema geoSchema = new SimpleRecordSchema(geoFields);
+
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("additionalType",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("manufacturer",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("dateCreated",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+ new RecordField("temperature",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("humidity",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("pressure",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("batteryLevel",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("signalStrength",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("isActive",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("geo",
RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ // Use Faker's device provider for serial number
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> f.device().serial()));
+
+ // Use Faker's device provider for platform/type
+ values.put("additionalType", generateNullableValue(nullPercentage,
faker, f -> f.device().platform()));
+
+ // Use Faker's device provider for manufacturer
+ values.put("manufacturer", generateNullableValue(nullPercentage,
faker, f -> f.device().manufacturer()));
+
+ values.put("dateCreated", generateNullableValue(nullPercentage,
faker, f -> new Timestamp(System.currentTimeMillis() -
f.number().numberBetween(0, 3600000))));
+ values.put("temperature", generateNullableValue(nullPercentage,
faker, f -> f.number().randomDouble(2, -20, 45)));
+ values.put("humidity", generateNullableValue(nullPercentage,
faker, f -> f.number().randomDouble(2, 0, 100)));
+ values.put("pressure", generateNullableValue(nullPercentage,
faker, f -> f.number().randomDouble(2, 980, 1050)));
+ values.put("batteryLevel", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(0, 100)));
+ values.put("signalStrength", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(-100, -30)));
+ values.put("isActive", generateNullableValue(nullPercentage,
faker, f -> f.bool().bool()));
+
+ // Generate nested geo record (GeoCoordinates)
+ Map<String, Object> geoValues = new LinkedHashMap<>();
+ geoValues.put("latitude", faker.number().randomDouble(6, -90, 90));
+ geoValues.put("longitude", faker.number().randomDouble(6, -180,
180));
+ geoValues.put("elevation", faker.number().randomDouble(2, 0,
3000));
+
+ RecordSchema geoSchema =
schema.getField("geo").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
schema.getField("geo").get().getDataType()).getChildSchema()
+ : null;
+
+ if (geoSchema != null) {
+ values.put("geo", generateNullableValue(nullPercentage, faker,
f -> new MapRecord(geoSchema, geoValues)));
+ }
+
+ return values;
+ }
+ },
+
+ PRODUCT("Product", "A product catalog entry with pricing and inventory
(schema.org/Product)") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // QuantitativeValue for dimensions per
schema.org/QuantitativeValue
+ List<RecordField> dimensionFields = Arrays.asList(
+ new RecordField("depth",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("width",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("height",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("weight",
RecordFieldType.DOUBLE.getDataType(), nullable)
+ );
+ RecordSchema dimensionSchema = new
SimpleRecordSchema(dimensionFields);
+
+ // Product fields per schema.org/Product
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("sku",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("name",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("description",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("category",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("brand",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("price",
RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable),
+ new RecordField("priceCurrency",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("availability",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("inventoryLevel",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("ratingValue",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("reviewCount",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("dateCreated",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("dateModified",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+ new RecordField("keywords",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
nullable),
+ new RecordField("additionalProperty",
RecordFieldType.RECORD.getRecordDataType(dimensionSchema), nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+ values.put("sku", generateNullableValue(nullPercentage, faker, f
-> "SKU-" + f.number().digits(8)));
+ values.put("name", generateNullableValue(nullPercentage, faker, f
-> f.commerce().productName()));
+ values.put("description", generateNullableValue(nullPercentage,
faker, f -> f.lorem().paragraph()));
+ values.put("category", generateNullableValue(nullPercentage,
faker, f -> f.commerce().department()));
+ // Use Faker's commerce provider for brand
+ values.put("brand", generateNullableValue(nullPercentage, faker, f
-> f.commerce().brand()));
+ values.put("price", generateNullableValue(nullPercentage, faker, f
-> BigDecimal.valueOf(f.number().randomDouble(2, 5, 2000)).setScale(2,
RoundingMode.HALF_UP)));
+
+ // Use Faker's money provider for currency codes
+ values.put("priceCurrency", generateNullableValue(nullPercentage,
faker, f -> f.money().currencyCode()));
+
+ int inventoryLevel = faker.number().numberBetween(0, 500);
+ values.put("availability", generateNullableValue(nullPercentage,
faker, f -> inventoryLevel > 0));
+ values.put("inventoryLevel", generateNullableValue(nullPercentage,
faker, f -> inventoryLevel));
+ values.put("ratingValue", generateNullableValue(nullPercentage,
faker, f -> f.number().randomDouble(1, 1, 5)));
+ values.put("reviewCount", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(0, 5000)));
+ values.put("dateCreated", generateNullableValue(nullPercentage,
faker, f -> new Date(f.timeAndDate().past(365, TimeUnit.DAYS).toEpochMilli())));
+ values.put("dateModified", generateNullableValue(nullPercentage,
faker, f -> new Timestamp(f.timeAndDate().past(30,
TimeUnit.DAYS).toEpochMilli())));
+
+ // Generate keywords array using Faker's marketing buzzwords
+ int keywordCount = faker.number().numberBetween(0, 4);
+ String[] keywords = new String[keywordCount];
+ for (int i = 0; i < keywordCount; i++) {
+ keywords[i] = faker.marketing().buzzwords().toLowerCase();
+ }
+ values.put("keywords", generateNullableValue(nullPercentage,
faker, f -> keywords));
+
+ // Generate nested additionalProperty record (dimensions)
+ Map<String, Object> dimensionValues = new LinkedHashMap<>();
+ dimensionValues.put("depth", faker.number().randomDouble(2, 1,
100));
+ dimensionValues.put("width", faker.number().randomDouble(2, 1,
100));
+ dimensionValues.put("height", faker.number().randomDouble(2, 1,
100));
+ dimensionValues.put("weight", faker.number().randomDouble(2, 1,
50));
+
+ RecordSchema dimensionSchema =
schema.getField("additionalProperty").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
schema.getField("additionalProperty").get().getDataType()).getChildSchema()
+ : null;
+
+ if (dimensionSchema != null) {
+ values.put("additionalProperty",
generateNullableValue(nullPercentage, faker, f -> new
MapRecord(dimensionSchema, dimensionValues)));
+ }
+
+ return values;
+ }
+ },
+
+ STOCK_TRADE("Stock Trade", "A stock market trade with pricing and volume")
{
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // Using schema.org naming conventions where applicable
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("identifier",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("tickerSymbol",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("name",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("exchange",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("actionType",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("dateCreated",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+ new RecordField("price",
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+ new RecordField("orderQuantity",
RecordFieldType.LONG.getDataType(), nullable),
+ new RecordField("totalPrice",
RecordFieldType.DECIMAL.getDecimalDataType(16, 2), nullable),
+ new RecordField("priceCurrency",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("bidPrice",
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+ new RecordField("askPrice",
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+ new RecordField("highPrice",
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+ new RecordField("lowPrice",
RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable),
+ new RecordField("marketCap",
RecordFieldType.LONG.getDataType(), nullable),
+ new RecordField("isSettled",
RecordFieldType.BOOLEAN.getDataType(), nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+
+ // Use Faker's stock provider for symbols (randomly choose between
NASDAQ and NYSE)
+ values.put("tickerSymbol", generateNullableValue(nullPercentage,
faker, f ->
+ f.bool().bool() ? f.stock().nsdqSymbol() :
f.stock().nyseSymbol()));
+ // Use Faker's company provider for company names
+ values.put("name", generateNullableValue(nullPercentage, faker, f
-> f.company().name()));
+
+ // Use Faker's stock provider for exchanges
+ values.put("exchange", generateNullableValue(nullPercentage,
faker, f -> f.stock().exchanges()));
+
+ // Trade types are fundamental financial terms (BUY/SELL)
+ String[] actionTypes = {"BuyAction", "SellAction"};
+ values.put("actionType", generateNullableValue(nullPercentage,
faker, f -> actionTypes[f.number().numberBetween(0, actionTypes.length)]));
+ values.put("dateCreated", generateNullableValue(nullPercentage,
faker, f -> new Timestamp(System.currentTimeMillis() -
f.number().numberBetween(0, 86400000))));
+
+ double price = faker.number().randomDouble(4, 10, 3000);
+ long orderQuantity = faker.number().numberBetween(1, 10000);
+ values.put("price", generateNullableValue(nullPercentage, faker, f
-> BigDecimal.valueOf(price).setScale(4, RoundingMode.HALF_UP)));
+ values.put("orderQuantity", generateNullableValue(nullPercentage,
faker, f -> orderQuantity));
+ values.put("totalPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(price * orderQuantity).setScale(2,
RoundingMode.HALF_UP)));
+ // Use Faker's money provider for currency codes
+ values.put("priceCurrency", generateNullableValue(nullPercentage,
faker, f -> f.money().currencyCode()));
+ values.put("bidPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(price * 0.999).setScale(4,
RoundingMode.HALF_UP)));
+ values.put("askPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(price * 1.001).setScale(4,
RoundingMode.HALF_UP)));
+ values.put("highPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(price * 1.5).setScale(4, RoundingMode.HALF_UP)));
+ values.put("lowPrice", generateNullableValue(nullPercentage,
faker, f -> BigDecimal.valueOf(price * 0.6).setScale(4, RoundingMode.HALF_UP)));
+ values.put("marketCap", generateNullableValue(nullPercentage,
faker, f -> f.number().numberBetween(1_000_000_000L, 3_000_000_000_000L)));
+ values.put("isSettled", generateNullableValue(nullPercentage,
faker, f -> f.bool().bool()));
+
+ return values;
+ }
+ },
+
+ COMPLETE_EXAMPLE("Complete Example", "A comprehensive schema demonstrating
all supported data types including nested records, arrays, and maps") {
+ @Override
+ public RecordSchema getSchema(boolean nullable) {
+ // GeoCoordinates per schema.org/GeoCoordinates
+ List<RecordField> geoFields = Arrays.asList(
+ new RecordField("latitude",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("longitude",
RecordFieldType.DOUBLE.getDataType(), nullable)
+ );
+ RecordSchema geoSchema = new SimpleRecordSchema(geoFields);
+
+ // PostalAddress per schema.org/PostalAddress with nested geo
+ List<RecordField> addressFields = Arrays.asList(
+ new RecordField("streetAddress",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressLocality",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressRegion",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("postalCode",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("addressCountry",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("geo",
RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable)
+ );
+ RecordSchema addressSchema = new SimpleRecordSchema(addressFields);
+
+ // Person per schema.org/Person with nested address
+ List<RecordField> personFields = Arrays.asList(
+ new RecordField("givenName",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("familyName",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("email",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("age", RecordFieldType.INT.getDataType(),
nullable),
+ new RecordField("verified",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("address",
RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable)
+ );
+ RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+ // Order per schema.org/Order for array of records
+ List<RecordField> orderFields = Arrays.asList(
+ new RecordField("orderNumber",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("totalPrice",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("priceCurrency",
RecordFieldType.STRING.getDataType(), nullable),
+ new RecordField("orderDate",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("isGift",
RecordFieldType.BOOLEAN.getDataType(), nullable)
+ );
+ RecordSchema orderSchema = new SimpleRecordSchema(orderFields);
+
+ // Main schema with all types using schema.org naming
+ List<RecordField> fields = Arrays.asList(
+ // Basic types
+ new RecordField("identifier",
RecordFieldType.UUID.getDataType(), nullable),
+ new RecordField("isActive",
RecordFieldType.BOOLEAN.getDataType(), nullable),
+ new RecordField("score",
RecordFieldType.INT.getDataType(), nullable),
+ new RecordField("count",
RecordFieldType.LONG.getDataType(), nullable),
+ new RecordField("ratingValue",
RecordFieldType.DOUBLE.getDataType(), nullable),
+ new RecordField("price",
RecordFieldType.FLOAT.getDataType(), nullable),
+ new RecordField("balance",
RecordFieldType.DECIMAL.getDecimalDataType(12, 2), nullable),
+ new RecordField("initial",
RecordFieldType.CHAR.getDataType(), nullable),
+ new RecordField("flags",
RecordFieldType.BYTE.getDataType(), nullable),
+ new RecordField("position",
RecordFieldType.SHORT.getDataType(), nullable),
+
+ // Date/Time types per schema.org
+ new RecordField("dateCreated",
RecordFieldType.DATE.getDataType(), nullable),
+ new RecordField("lastLogin",
RecordFieldType.TIME.getDataType(), nullable),
+ new RecordField("dateModified",
RecordFieldType.TIMESTAMP.getDataType(), nullable),
+
+ // Complex types
+ new RecordField("keywords",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
nullable),
+ new RecordField("scores",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()),
nullable),
+ new RecordField("additionalProperty",
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()),
nullable),
+ new RecordField("person",
RecordFieldType.RECORD.getRecordDataType(personSchema), nullable),
+ new RecordField("orderedItem",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderSchema)),
nullable)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Map<String, Object> generateValues(Faker faker, RecordSchema
schema, int nullPercentage) {
+ Map<String, Object> values = new LinkedHashMap<>();
+
+ // Basic types
+ values.put("identifier", generateNullableValue(nullPercentage,
faker, f -> UUID.randomUUID()));
+ values.put("isActive", generateNullableValue(nullPercentage,
faker, f -> f.bool().bool()));
+ values.put("score", generateNullableValue(nullPercentage, faker, f
-> f.number().numberBetween(0, 100)));
+ values.put("count", generateNullableValue(nullPercentage, faker, f
-> f.number().numberBetween(0L, 1_000_000L)));
+ values.put("ratingValue", generateNullableValue(nullPercentage,
faker, f -> f.number().randomDouble(2, 0, 5)));
+ values.put("price", generateNullableValue(nullPercentage, faker, f
-> (float) f.number().randomDouble(2, 1, 1000)));
+ values.put("balance", generateNullableValue(nullPercentage, faker,
f -> BigDecimal.valueOf(f.number().randomDouble(2, -10000, 50000)).setScale(2,
RoundingMode.HALF_UP)));
+ values.put("initial", generateNullableValue(nullPercentage, faker,
f -> (char) ('A' + f.number().numberBetween(0, 26))));
+ values.put("flags", generateNullableValue(nullPercentage, faker, f
-> (byte) f.number().numberBetween(0, 127)));
+ values.put("position", generateNullableValue(nullPercentage,
faker, f -> (short) f.number().numberBetween(1, 1000)));
+
+ // Date/Time types per schema.org
+ Instant pastInstant = faker.timeAndDate().past(365, TimeUnit.DAYS);
+ values.put("dateCreated", generateNullableValue(nullPercentage,
faker, f -> new Date(pastInstant.toEpochMilli())));
+ values.put("lastLogin", generateNullableValue(nullPercentage,
faker, f -> new Time(f.timeAndDate().past(1, TimeUnit.DAYS).toEpochMilli())));
+ values.put("dateModified", generateNullableValue(nullPercentage,
faker, f -> new Timestamp(f.timeAndDate().past(7,
TimeUnit.DAYS).toEpochMilli())));
+
+ // Array of strings (keywords) using Faker's word provider
+ int keywordCount = faker.number().numberBetween(1, 5);
+ String[] keywords = new String[keywordCount];
+ for (int i = 0; i < keywordCount; i++) {
+ keywords[i] = faker.word().adjective();
+ }
+ values.put("keywords", generateNullableValue(nullPercentage,
faker, f -> keywords));
+
+ // Array of integers
+ int scoreCount = faker.number().numberBetween(3, 8);
+ Integer[] scores = new Integer[scoreCount];
+ for (int i = 0; i < scoreCount; i++) {
+ scores[i] = faker.number().numberBetween(50, 100);
+ }
+ values.put("scores", generateNullableValue(nullPercentage, faker,
f -> scores));
+
+ // Map (additionalProperty) using Faker providers
+ Map<String, String> additionalProperty = new HashMap<>();
+ additionalProperty.put("source",
faker.app().name().toLowerCase().replace(" ", "-"));
+ additionalProperty.put("version", faker.app().version());
+ additionalProperty.put("environment",
faker.options().option("dev", "staging", "prod"));
+ // Use Faker's AWS provider for region
+ additionalProperty.put("region", faker.aws().region());
+ values.put("additionalProperty",
generateNullableValue(nullPercentage, faker, f -> additionalProperty));
+
+ // Nested person record (3 levels deep: person -> address -> geo)
+ RecordSchema personSchema =
schema.getField("person").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
schema.getField("person").get().getDataType()).getChildSchema()
+ : null;
+
+ if (personSchema != null) {
+ Map<String, Object> personValues = new LinkedHashMap<>();
+ personValues.put("givenName", faker.name().firstName());
+ personValues.put("familyName", faker.name().lastName());
+ personValues.put("email", faker.internet().emailAddress());
+ personValues.put("age", faker.number().numberBetween(18, 80));
+ personValues.put("verified", faker.bool().bool());
+
+ RecordSchema addressSchema =
personSchema.getField("address").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
personSchema.getField("address").get().getDataType()).getChildSchema()
+ : null;
+
+ if (addressSchema != null) {
+ Map<String, Object> addressValues = new LinkedHashMap<>();
+ addressValues.put("streetAddress",
faker.address().streetAddress());
+ addressValues.put("addressLocality",
faker.address().city());
+ addressValues.put("addressRegion",
faker.address().state());
+ addressValues.put("postalCode", faker.address().zipCode());
+ addressValues.put("addressCountry",
faker.address().country());
+
+ RecordSchema geoSchema =
addressSchema.getField("geo").get().getDataType().getFieldType() ==
RecordFieldType.RECORD
+ ? ((RecordDataType)
addressSchema.getField("geo").get().getDataType()).getChildSchema()
+ : null;
+
+ if (geoSchema != null) {
+ Map<String, Object> geoValues = new LinkedHashMap<>();
+ geoValues.put("latitude",
faker.number().randomDouble(6, -90, 90));
+ geoValues.put("longitude",
faker.number().randomDouble(6, -180, 180));
+ addressValues.put("geo", new MapRecord(geoSchema,
geoValues));
+ }
+
+ personValues.put("address", new MapRecord(addressSchema,
addressValues));
+ }
+
+ values.put("person", generateNullableValue(nullPercentage,
faker, f -> new MapRecord(personSchema, personValues)));
+ }
+
+ // Array of order records (orderedItem)
+ RecordSchema orderSchema = null;
+ if
(schema.getField("orderedItem").get().getDataType().getFieldType() ==
RecordFieldType.ARRAY) {
+ DataType elementType = ((ArrayDataType)
schema.getField("orderedItem").get().getDataType()).getElementType();
+ if (elementType.getFieldType() == RecordFieldType.RECORD) {
+ orderSchema = ((RecordDataType)
elementType).getChildSchema();
+ }
+ }
+
+ if (orderSchema != null) {
+ final RecordSchema finalOrderSchema = orderSchema;
+ int orderCount = faker.number().numberBetween(1, 4);
+ Object[] orders = new Object[orderCount];
+ for (int i = 0; i < orderCount; i++) {
+ Map<String, Object> orderValues = new LinkedHashMap<>();
+ orderValues.put("orderNumber", "ORD-" +
faker.number().digits(8));
+ orderValues.put("totalPrice",
faker.number().randomDouble(2, 10, 500));
+ // Use Faker's money provider for currency codes
+ orderValues.put("priceCurrency",
faker.money().currencyCode());
+ orderValues.put("orderDate", new
Date(faker.timeAndDate().past(90, TimeUnit.DAYS).toEpochMilli()));
+ orderValues.put("isGift", faker.bool().bool());
+ orders[i] = new MapRecord(finalOrderSchema, orderValues);
+ }
+ values.put("orderedItem",
generateNullableValue(nullPercentage, faker, f -> orders));
+ }
+
+ return values;
+ }
+ };
+
+ private final String displayName;
+ private final String description;
+
+ PredefinedRecordSchema(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Get the record schema for this predefined schema type.
+ *
+ * @param nullable whether fields should be nullable
+ * @return the RecordSchema
+ */
+ public abstract RecordSchema getSchema(boolean nullable);
+
+ /**
+ * Generate random values for all fields in this schema.
+ *
+ * @param faker the Faker instance to use for generating random data
+ * @param schema the RecordSchema to generate values for
+ * @param nullPercentage the percentage chance (0-100) that nullable
fields will be null
+ * @return a Map of field names to generated values
+ */
+ public abstract Map<String, Object> generateValues(Faker faker,
RecordSchema schema, int nullPercentage);
+
+ /**
+ * Generate a record with random values.
+ *
+ * @param faker the Faker instance to use for generating random data
+ * @param nullable whether fields should be nullable
+ * @param nullPercentage the percentage chance (0-100) that nullable
fields will be null
+ * @return a Record with generated values
+ */
+ public Record generateRecord(Faker faker, boolean nullable, int
nullPercentage) {
+ RecordSchema schema = getSchema(nullable);
+ Map<String, Object> values = generateValues(faker, schema,
nullPercentage);
+ return new MapRecord(schema, values);
+ }
+
+ /**
+ * Helper method to generate a value with a chance of being null.
+ */
+ protected static <T> T generateNullableValue(int nullPercentage, Faker
faker, Function<Faker, T> generator) {
+ if (nullPercentage > 0 && faker.number().numberBetween(0, 100) <
nullPercentage) {
+ return null;
+ }
+ return generator.apply(faker);
+ }
+
+ /**
+ * Get a predefined schema by name, or null if not found or empty.
+ *
+ * @param name the name of the predefined schema
+ * @return the PredefinedRecordSchema, or null if not found or name is
null/empty
+ */
+ public static PredefinedRecordSchema fromName(String name) {
+ if (name == null || name.isEmpty()) {
+ return null;
+ }
+ try {
+ return valueOf(name);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
index 399a394cd6..bd111cf856 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
@@ -23,6 +23,8 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.processors.standard.faker.FakerMethodHolder;
import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
@@ -158,6 +160,9 @@ public class TestGenerateRecord {
testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT),
((AllowableValue) field.get(processor)).getValue());
}
+ // Add at least one dynamic property to satisfy schema configuration
validation
+ testRunner.setProperty("testField", "Name.fullName");
+
final Map<String, String> recordFields =
processor.getFields(testRunner.getProcessContext());
final RecordSchema outputSchema =
processor.generateRecordSchema(recordFields, true);
final MockRecordWriter recordWriter = new MockRecordWriter(null, true,
-1, false, outputSchema);
@@ -301,4 +306,127 @@ public class TestGenerateRecord {
final PropertyMigrationResult propertyMigrationResult =
testRunner.migrateProperties();
assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
}
+
+ @Test
+ public void testValidationFailsWithNoSchemaConfiguration() throws
InitializationException {
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ public void testValidationFailsWithMultipleSchemaConfigurations() throws
Exception {
+ String schemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ // Set both Schema Text and Predefined Schema - should be invalid
+ testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
+ testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA,
PredefinedRecordSchema.PERSON.name());
+
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ public void testValidationFailsWithPredefinedSchemaAndDynamicProperties()
throws Exception {
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ // Set both Predefined Schema and dynamic property - should be invalid
+ testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA,
PredefinedRecordSchema.PERSON.name());
+ testRunner.setProperty("myField", "Address.fullAddress");
+
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ public void testPredefinedSchemaPerson() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.PERSON, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaOrder() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.ORDER, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaEvent() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.EVENT, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaSensor() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaProduct() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaStockTrade() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5);
+ }
+
+ @Test
+ public void testPredefinedSchemaCompleteExample() throws Exception {
+ testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3);
+ }
+
+ private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema,
int numRecords) throws Exception {
+ final RecordSchema schema = predefinedSchema.getSchema(true);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true,
-1, false, schema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA,
predefinedSchema.name());
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS,
String.valueOf(numRecords));
+
+ testRunner.assertValid();
+ testRunner.run();
+
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst();
+
+ // Verify record count attribute
+ flowFile.assertAttributeEquals("record.count",
String.valueOf(numRecords));
+ }
+
+ @Test
+ public void testPredefinedSchemaWithNullPercentage() throws Exception {
+ final RecordSchema schema =
PredefinedRecordSchema.PERSON.getSchema(true);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true,
-1, false, schema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA,
PredefinedRecordSchema.PERSON.name());
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ testRunner.assertValid();
+ testRunner.run();
+
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst();
+
+ // Verify record count attribute
+ flowFile.assertAttributeEquals("record.count", "1");
+ }
}