This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e33a8f7 [HUDI-1147] Modify GenericRecordFullPayloadGenerator to
generate vali… (#2045)
e33a8f7 is described below
commit e33a8f733c4a9a94479c166ad13ae9d53142cd3f
Author: Balajee Nagasubramaniam <[email protected]>
AuthorDate: Tue Dec 29 13:33:19 2020 -0800
[HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate vali…
(#2045)
* [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid
timestamps
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../integ/testsuite/generator/DeltaGenerator.java | 4 +-
.../FlexibleSchemaRecordGenerationIterator.java | 23 +++++++-----
.../GenericRecordFullPayloadGenerator.java | 43 ++++++++++++++++------
.../TestGenericRecordPayloadGenerator.java | 35 ++++++++++++++++++
4 files changed, 83 insertions(+), 22 deletions(-)
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 6242cbf..30b2d6c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -129,7 +129,7 @@ public class DeltaGenerator implements Serializable {
public JavaRDD<GenericRecord> generateInserts(Config operation) {
int numPartitions = operation.getNumInsertPartitions();
- long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
+ long recordsPerPartition = operation.getNumRecordsInsert();
int minPayloadSize = operation.getRecordSize();
int startPartition = operation.getStartPartition();
@@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable {
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes,
numPartitions)
.mapPartitionsWithIndex((index, p) -> {
return new LazyRecordGeneratorIterator(new
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
- minPayloadSize, schemaStr, partitionPathFieldNames,
(Integer)index));
+ minPayloadSize, schemaStr, partitionPathFieldNames,
numPartitions));
}, true);
if (deltaOutputConfig.getInputParallelism() < numPartitions) {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 5477371..787ec84 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -20,11 +20,6 @@ package org.apache.hudi.integ.testsuite.generator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,17 +41,21 @@ public class FlexibleSchemaRecordGenerationIterator
implements Iterator<GenericR
private GenericRecord lastRecord;
// Partition path field name
private Set<String> partitionPathFieldNames;
+ private String firstPartitionPathField;
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce,
String schema) {
this(maxEntriesToProduce,
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
}
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int
minPayloadSize, String schemaStr,
- List<String> partitionPathFieldNames, int partitionIndex) {
+ List<String> partitionPathFieldNames, int numPartitions) {
this.counter = maxEntriesToProduce;
this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
+ if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) {
+ this.firstPartitionPathField = partitionPathFieldNames.get(0);
+ }
Schema schema = new Schema.Parser().parse(schemaStr);
- this.generator = new GenericRecordFullPayloadGenerator(schema,
minPayloadSize, partitionIndex);
+ this.generator = new GenericRecordFullPayloadGenerator(schema,
minPayloadSize, numPartitions);
}
@Override
@@ -67,12 +66,18 @@ public class FlexibleSchemaRecordGenerationIterator
implements Iterator<GenericR
@Override
public GenericRecord next() {
this.counter--;
+ boolean partitionPathsNonEmpty = partitionPathFieldNames != null &&
partitionPathFieldNames.size() > 0;
if (lastRecord == null) {
- GenericRecord record =
this.generator.getNewPayload(partitionPathFieldNames);
+ GenericRecord record = partitionPathsNonEmpty
+ ?
this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
+ : this.generator.getNewPayload();
lastRecord = record;
return record;
} else {
- return this.generator.randomize(lastRecord,
this.partitionPathFieldNames);
+ return partitionPathsNonEmpty
+ ? this.generator.getUpdatePayloadWithTimestamp(lastRecord,
+ this.partitionPathFieldNames, firstPartitionPathField)
+ : this.generator.getUpdatePayload(lastRecord,
this.partitionPathFieldNames);
}
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 49a5f31..510fc49 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -46,9 +46,9 @@ import java.util.concurrent.TimeUnit;
*/
public class GenericRecordFullPayloadGenerator implements Serializable {
- private static Logger LOG =
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
-
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
+ public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
public static final String DEFAULT_HOODIE_IS_DELETED_COL =
"_hoodie_is_deleted";
protected final Random random = new Random();
// The source schema used to generate a payload
@@ -58,10 +58,12 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
// The index of partition for which records are being generated
private int partitionIndex = 0;
// The size of a full record where every field of a generic record created
contains 1 random value
- private final int estimatedFullPayloadSize;
+ private int estimatedFullPayloadSize;
// Number of extra entries to add in a complex/collection field to achieve
the desired record size
Map<String, Integer> extraEntriesMap = new HashMap<>();
+ // The number of unique dates to create
+ private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
// LogicalTypes in Avro 1.8.2
private static final String DECIMAL = "decimal";
private static final String UUID_NAME = "uuid";
@@ -75,6 +77,11 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
this(schema, DEFAULT_PAYLOAD_SIZE);
}
+ public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize,
int numDatePartitions) {
+ this(schema, minPayloadSize);
+ this.numDatePartitions = numDatePartitions;
+ }
+
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
Pair<Integer, Integer> sizeInfo = new
GenericRecordFullPayloadSizeEstimator(schema)
.typeEstimateAndNumComplexFields();
@@ -83,19 +90,13 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
if (estimatedFullPayloadSize < minPayloadSize) {
int numberOfComplexFields = sizeInfo.getRight();
if (numberOfComplexFields < 1) {
- LOG.warn("The schema does not have any collections/complex fields. "
- + "Cannot achieve minPayloadSize => " + minPayloadSize);
+ LOG.warn("The schema does not have any collections/complex fields.
Cannot achieve minPayloadSize : {}",
+ minPayloadSize);
}
-
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize -
estimatedFullPayloadSize);
}
}
- public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize,
int partitionIndex) {
- this(schema, minPayloadSize);
- this.partitionIndex = partitionIndex;
- }
-
protected static boolean isPrimitive(Schema localSchema) {
if (localSchema.getType() != Type.ARRAY
&& localSchema.getType() != Type.MAP
@@ -131,6 +132,15 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
return create(baseSchema, partitionPathFieldNames);
}
+ public GenericRecord getNewPayloadWithTimestamp(String tsFieldName) {
+ return updateTimestamp(create(baseSchema, null), tsFieldName);
+ }
+
+ public GenericRecord getUpdatePayloadWithTimestamp(GenericRecord record,
Set<String> blacklistFields,
+ String tsFieldName) {
+ return updateTimestamp(randomize(record, blacklistFields), tsFieldName);
+ }
+
protected GenericRecord create(Schema schema, Set<String>
partitionPathFieldNames) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
@@ -314,6 +324,17 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
return genericData.validate(baseSchema, record);
}
+ /*
+ * Generates a sequential timestamp (daily increment), and updates the
timestamp field of the record.
+ * Note: When generating records, number of records to be generated must be
more than numDatePartitions * parallelism,
+ * to guarantee that at least numDatePartitions are created.
+ */
+ public GenericRecord updateTimestamp(GenericRecord record, String fieldName)
{
+ long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex %
numDatePartitions, TimeUnit.DAYS);
+ record.put(fieldName, System.currentTimeMillis() - delta);
+ return record;
+ }
+
/**
* Check whether a schema is option. return true if it match the follows: 1.
Its type is Type.UNION 2. Has two types 3. Has a NULL type.
*/
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
index 9451595..2b3a65c 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -133,4 +134,38 @@ public class TestGenericRecordPayloadGenerator {
assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize +
0.1 * minPayloadSize);
}
+ @Test
+ public void testUpdatePayloadGeneratorWithTimestamp() throws IOException {
+ Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
+ .readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." +
SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
+ GenericRecordFullPayloadGenerator payloadGenerator = new
GenericRecordFullPayloadGenerator(schema);
+ List<String> insertRowKeys = new ArrayList<>();
+ List<String> updateRowKeys = new ArrayList<>();
+ List<Long> insertTimeStamps = new ArrayList<>();
+ List<Long> updateTimeStamps = new ArrayList<>();
+ List<GenericRecord> records = new ArrayList<>();
+ Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS
+
.convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS,
TimeUnit.DAYS);
+
+ // Generate 10 new records
+ IntStream.range(0, 10).forEach(a -> {
+ GenericRecord record =
payloadGenerator.getNewPayloadWithTimestamp("timestamp");
+ records.add(record);
+ insertRowKeys.add(record.get("_row_key").toString());
+ insertTimeStamps.add((Long) record.get("timestamp"));
+ });
+ Set<String> blacklistFields = new HashSet<>(Arrays.asList("_row_key"));
+ records.stream().forEach(a -> {
+ // Generate 10 updated records
+ GenericRecord record = payloadGenerator.getUpdatePayloadWithTimestamp(a,
blacklistFields, "timestamp");
+ updateRowKeys.add(record.get("_row_key").toString());
+ updateTimeStamps.add((Long) record.get("timestamp"));
+ });
+ // The row keys from insert payloads should match all the row keys from
the update payloads
+ assertTrue(insertRowKeys.containsAll(updateRowKeys));
+ // The timestamp field for the insert payloads should not all match with
the update payloads
+ assertFalse(insertTimeStamps.containsAll(updateTimeStamps));
+ Long currentMillis = System.currentTimeMillis();
+ assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis && t
<= currentMillis));
+ }
}