This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e33a8f7  [HUDI-1147] Modify GenericRecordFullPayloadGenerator to 
generate vali… (#2045)
e33a8f7 is described below

commit e33a8f733c4a9a94479c166ad13ae9d53142cd3f
Author: Balajee Nagasubramaniam <[email protected]>
AuthorDate: Tue Dec 29 13:33:19 2020 -0800

    [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate vali… 
(#2045)
    
    * [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid 
timestamps
    
    Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 .../integ/testsuite/generator/DeltaGenerator.java  |  4 +-
 .../FlexibleSchemaRecordGenerationIterator.java    | 23 +++++++-----
 .../GenericRecordFullPayloadGenerator.java         | 43 ++++++++++++++++------
 .../TestGenericRecordPayloadGenerator.java         | 35 ++++++++++++++++++
 4 files changed, 83 insertions(+), 22 deletions(-)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 6242cbf..30b2d6c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -129,7 +129,7 @@ public class DeltaGenerator implements Serializable {
 
   public JavaRDD<GenericRecord> generateInserts(Config operation) {
     int numPartitions = operation.getNumInsertPartitions();
-    long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
+    long recordsPerPartition = operation.getNumRecordsInsert();
     int minPayloadSize = operation.getRecordSize();
     int startPartition = operation.getStartPartition();
 
@@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable {
     JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, 
numPartitions)
         .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new 
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
-            minPayloadSize, schemaStr, partitionPathFieldNames, 
(Integer)index));
+              minPayloadSize, schemaStr, partitionPathFieldNames, 
numPartitions));
         }, true);
 
     if (deltaOutputConfig.getInputParallelism() < numPartitions) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 5477371..787ec84 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -20,11 +20,6 @@ package org.apache.hudi.integ.testsuite.generator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
 
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,17 +41,21 @@ public class FlexibleSchemaRecordGenerationIterator 
implements Iterator<GenericR
   private GenericRecord lastRecord;
   // Partition path field name
   private Set<String> partitionPathFieldNames;
+  private String firstPartitionPathField;
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, 
String schema) {
     this(maxEntriesToProduce, 
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
   }
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int 
minPayloadSize, String schemaStr,
-      List<String> partitionPathFieldNames, int partitionIndex) {
+      List<String> partitionPathFieldNames, int numPartitions) {
     this.counter = maxEntriesToProduce;
     this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
+    if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) {
+      this.firstPartitionPathField = partitionPathFieldNames.get(0);
+    }
     Schema schema = new Schema.Parser().parse(schemaStr);
-    this.generator = new GenericRecordFullPayloadGenerator(schema, 
minPayloadSize, partitionIndex);
+    this.generator = new GenericRecordFullPayloadGenerator(schema, 
minPayloadSize, numPartitions);
   }
 
   @Override
@@ -67,12 +66,18 @@ public class FlexibleSchemaRecordGenerationIterator 
implements Iterator<GenericR
   @Override
   public GenericRecord next() {
     this.counter--;
+    boolean partitionPathsNonEmpty = partitionPathFieldNames != null && 
partitionPathFieldNames.size() > 0;
     if (lastRecord == null) {
-      GenericRecord record = 
this.generator.getNewPayload(partitionPathFieldNames);
+      GenericRecord record = partitionPathsNonEmpty
+          ? 
this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
+          : this.generator.getNewPayload();
       lastRecord = record;
       return record;
     } else {
-      return this.generator.randomize(lastRecord, 
this.partitionPathFieldNames);
+      return partitionPathsNonEmpty
+          ? this.generator.getUpdatePayloadWithTimestamp(lastRecord,
+          this.partitionPathFieldNames, firstPartitionPathField)
+          : this.generator.getUpdatePayload(lastRecord, 
this.partitionPathFieldNames);
     }
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 49a5f31..510fc49 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -46,9 +46,9 @@ import java.util.concurrent.TimeUnit;
  */
 public class GenericRecordFullPayloadGenerator implements Serializable {
 
-  private static Logger LOG = 
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
   public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
+  public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
   public static final String DEFAULT_HOODIE_IS_DELETED_COL = 
"_hoodie_is_deleted";
   protected final Random random = new Random();
   // The source schema used to generate a payload
@@ -58,10 +58,12 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
   // The index of partition for which records are being generated
   private int partitionIndex = 0;
   // The size of a full record where every field of a generic record created 
contains 1 random value
-  private final int estimatedFullPayloadSize;
+  private int estimatedFullPayloadSize;
   // Number of extra entries to add in a complex/collection field to achieve 
the desired record size
   Map<String, Integer> extraEntriesMap = new HashMap<>();
 
+  // The number of unique dates to create
+  private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
   // LogicalTypes in Avro 1.8.2
   private static final String DECIMAL = "decimal";
   private static final String UUID_NAME = "uuid";
@@ -75,6 +77,11 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     this(schema, DEFAULT_PAYLOAD_SIZE);
   }
 
+  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, 
int numDatePartitions) {
+    this(schema, minPayloadSize);
+    this.numDatePartitions = numDatePartitions;
+  }
+
   public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
     Pair<Integer, Integer> sizeInfo = new 
GenericRecordFullPayloadSizeEstimator(schema)
         .typeEstimateAndNumComplexFields();
@@ -83,19 +90,13 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     if (estimatedFullPayloadSize < minPayloadSize) {
       int numberOfComplexFields = sizeInfo.getRight();
       if (numberOfComplexFields < 1) {
-        LOG.warn("The schema does not have any collections/complex fields. "
-            + "Cannot achieve minPayloadSize => " + minPayloadSize);
+        LOG.warn("The schema does not have any collections/complex fields. 
Cannot achieve minPayloadSize : {}",
+            minPayloadSize);
       }
-
       determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - 
estimatedFullPayloadSize);
     }
   }
 
-  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, 
int partitionIndex) {
-    this(schema, minPayloadSize);
-    this.partitionIndex = partitionIndex;
-  }
-
   protected static boolean isPrimitive(Schema localSchema) {
     if (localSchema.getType() != Type.ARRAY
         && localSchema.getType() != Type.MAP
@@ -131,6 +132,15 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     return create(baseSchema, partitionPathFieldNames);
   }
 
+  public GenericRecord getNewPayloadWithTimestamp(String tsFieldName) {
+    return updateTimestamp(create(baseSchema, null), tsFieldName);
+  }
+
+  public GenericRecord getUpdatePayloadWithTimestamp(GenericRecord record, 
Set<String> blacklistFields,
+                                                     String tsFieldName) {
+    return updateTimestamp(randomize(record, blacklistFields), tsFieldName);
+  }
+
   protected GenericRecord create(Schema schema, Set<String> 
partitionPathFieldNames) {
     GenericRecord result = new GenericData.Record(schema);
     for (Schema.Field f : schema.getFields()) {
@@ -314,6 +324,17 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     return genericData.validate(baseSchema, record);
   }
 
+  /*
+   * Generates a sequential timestamp (daily increment), and updates the 
timestamp field of the record.
+   * Note: When generating records, number of records to be generated must be 
more than numDatePartitions * parallelism,
+   * to guarantee that at least numDatePartitions are created.
+   */
+  public GenericRecord updateTimestamp(GenericRecord record, String fieldName) 
{
+    long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % 
numDatePartitions, TimeUnit.DAYS);
+    record.put(fieldName, System.currentTimeMillis() - delta);
+    return record;
+  }
+
   /**
    * Check whether a schema is option. return true if it match the follows: 1. 
Its type is Type.UNION 2. Has two types 3. Has a NULL type.
    */
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
index 9451595..2b3a65c 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -133,4 +134,38 @@ public class TestGenericRecordPayloadGenerator {
     assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 
0.1 * minPayloadSize);
   }
 
+  @Test
+  public void testUpdatePayloadGeneratorWithTimestamp() throws IOException {
+    Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
+        .readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." + 
SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
+    GenericRecordFullPayloadGenerator payloadGenerator = new 
GenericRecordFullPayloadGenerator(schema);
+    List<String> insertRowKeys = new ArrayList<>();
+    List<String> updateRowKeys = new ArrayList<>();
+    List<Long> insertTimeStamps = new ArrayList<>();
+    List<Long> updateTimeStamps = new ArrayList<>();
+    List<GenericRecord> records = new ArrayList<>();
+    Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS
+        
.convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS, 
TimeUnit.DAYS);
+
+    // Generate 10 new records
+    IntStream.range(0, 10).forEach(a -> {
+      GenericRecord record = 
payloadGenerator.getNewPayloadWithTimestamp("timestamp");
+      records.add(record);
+      insertRowKeys.add(record.get("_row_key").toString());
+      insertTimeStamps.add((Long) record.get("timestamp"));
+    });
+    Set<String> blacklistFields = new HashSet<>(Arrays.asList("_row_key"));
+    records.stream().forEach(a -> {
+      // Generate 10 updated records
+      GenericRecord record = payloadGenerator.getUpdatePayloadWithTimestamp(a, 
blacklistFields, "timestamp");
+      updateRowKeys.add(record.get("_row_key").toString());
+      updateTimeStamps.add((Long) record.get("timestamp"));
+    });
+    // The row keys from insert payloads should match all the row keys from 
the update payloads
+    assertTrue(insertRowKeys.containsAll(updateRowKeys));
+    // The timestamp field for the insert payloads should not all match with 
the update payloads
+    assertFalse(insertTimeStamps.containsAll(updateTimeStamps));
+    Long currentMillis = System.currentTimeMillis();
+    assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis  && t 
<= currentMillis));
+  }
 }

Reply via email to