This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6310a23  [HUDI-1351] Improvements to the hudi test suite for 
scalability and repeated testing. (#2197)
6310a23 is described below

commit 6310a2307abba94c7ff8a770f45462deae2c312e
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Oct 29 06:50:37 2020 -0700

    [HUDI-1351] Improvements to the hudi test suite for scalability and 
repeated testing. (#2197)
    
    1. Added the --clean-input and --clean-output parameters to clean the input 
and output directories before starting the job
    2. Added the --delete-old-input parameter to deleted older batches for data 
already ingested. This helps keep number of redundant files low.
    3. Added the --input-parallelism parameter to restrict the parallelism when 
generating input data. This helps keeping the number of generated input files 
low.
    4. Added an option start_offset to Dag Nodes. Without ability to specify 
start offsets, data is generated into existing partitions. With start offset, 
DAG can control on which partition, the data is to be written.
    5. Fixed generation of records for correct number of partitions
      - In the existing implementation, the partition is chosen as a random 
long. This does not guarantee exact number of requested partitions to be 
created.
    6. Changed variable blacklistedFields to be a Set as that is faster than 
List for membership checks.
    7. Fixed integer division for Math.ceil. If two integers are divided, the 
result is not double unless one of the integer is casted to double.
---
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   | 29 +++++++++
 .../testsuite/configuration/DFSDeltaConfig.java    | 17 +++++-
 .../integ/testsuite/configuration/DeltaConfig.java | 12 +++-
 .../hudi/integ/testsuite/dag/WriterContext.java    |  3 +-
 .../integ/testsuite/generator/DeltaGenerator.java  | 48 ++++++++++++---
 .../FlexibleSchemaRecordGenerationIterator.java    | 15 +++--
 .../GenericRecordFullPayloadGenerator.java         | 68 +++++++++++++++++-----
 .../generator/UpdateGeneratorIterator.java         |  8 ++-
 .../reader/DFSHoodieDatasetInputReader.java        | 26 ++++++---
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |  2 +-
 .../TestGenericRecordPayloadGenerator.java         |  5 +-
 11 files changed, 187 insertions(+), 46 deletions(-)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index c2c242a..7b3324e 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -96,6 +96,20 @@ public class HoodieTestSuiteJob {
       HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), 
cfg.targetBasePath,
           HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, 
"archived");
     }
+
+    if (cfg.cleanInput) {
+      Path inputPath = new Path(cfg.inputBasePath);
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+    }
+
+    if (cfg.cleanOutput) {
+      Path outputPath = new Path(cfg.targetBasePath);
+      if (fs.exists(outputPath)) {
+        fs.delete(outputPath, true);
+      }
+    }
   }
 
   private static HiveConf getDefaultHiveConf(Configuration cfg) {
@@ -175,9 +189,24 @@ public class HoodieTestSuiteJob {
         required = true)
     public Long limitFileSize = 1024 * 1024 * 120L;
 
+    @Parameter(names = {"--input-parallelism"}, description = "Parallelism to 
use when generation input files",
+        required = false)
+    public Integer inputParallelism = 0;
+
+    @Parameter(names = {"--delete-old-input"}, description = "Delete older 
input files once they have been ingested",
+        required = false)
+    public Boolean deleteOldInput = false;
+
     @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether 
to use HoodieDeltaStreamer to "
         + "perform ingestion. If set to false, HoodieWriteClient will be used")
     public Boolean useDeltaStreamer = false;
 
+    @Parameter(names = {"--clean-input"}, description = "Clean the input 
folders and delete all files within it "
+        + "before starting the Job")
+    public Boolean cleanInput = false;
+
+    @Parameter(names = {"--clean-output"}, description = "Clean the output 
folders and delete all files within it "
+        + "before starting the Job")
+    public Boolean cleanOutput = false;
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
index 2915628..0ac3668 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
@@ -36,15 +36,22 @@ public class DFSDeltaConfig extends DeltaConfig {
   private final Long maxFileSize;
   // The current batch id
   private Integer batchId;
+  // Paralleism to use when generating input data
+  private int inputParallelism;
+  // Whether to delete older input data once it has been ingested
+  private boolean deleteOldInputData;
 
   public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType 
deltaInputType,
                         SerializableConfiguration configuration,
-                        String deltaBasePath, String targetBasePath, String 
schemaStr, Long maxFileSize) {
+                        String deltaBasePath, String targetBasePath, String 
schemaStr, Long maxFileSize,
+                        int inputParallelism, boolean deleteOldInputData) {
     super(deltaOutputMode, deltaInputType, configuration);
     this.deltaBasePath = deltaBasePath;
     this.schemaStr = schemaStr;
     this.maxFileSize = maxFileSize;
     this.datasetOutputPath = targetBasePath;
+    this.inputParallelism = inputParallelism;
+    this.deleteOldInputData = deleteOldInputData;
   }
 
   public String getDeltaBasePath() {
@@ -70,4 +77,12 @@ public class DFSDeltaConfig extends DeltaConfig {
   public void setBatchId(Integer batchId) {
     this.batchId = batchId;
   }
+
+  public int getInputParallelism() {
+    return inputParallelism;
+  }
+
+  public boolean shouldDeleteOldInputData() {
+    return deleteOldInputData;
+  }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 7a66681..db15604 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -83,6 +83,7 @@ public class DeltaConfig implements Serializable {
     private static String DISABLE_INGEST = "disable_ingest";
     private static String HIVE_LOCAL = "hive_local";
     private static String REINIT_CONTEXT = "reinitialize_context";
+    private static String START_PARTITION = "start_partition";
 
     private Map<String, Object> configsMap;
 
@@ -118,8 +119,12 @@ public class DeltaConfig implements Serializable {
       return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 
0).toString());
     }
 
+    public int getStartPartition() {
+      return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 
0).toString());
+    }
+
     public int getNumUpsertFiles() {
-      return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 
1).toString());
+      return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 
0).toString());
     }
 
     public double getFractionUpsertPerFile() {
@@ -207,6 +212,11 @@ public class DeltaConfig implements Serializable {
         return this;
       }
 
+      public Builder withStartPartition(int startPartition) {
+        this.configsMap.put(START_PARTITION, startPartition);
+        return this;
+      }
+
       public Builder withNumTimesToRepeat(int repeatCount) {
         this.configsMap.put(REPEAT_COUNT, repeatCount);
         return this;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index 21a84db..e457f0a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -67,10 +67,11 @@ public class WriterContext {
       this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
       String schemaStr = schemaProvider.getSourceSchema().toString();
       this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, 
schemaStr);
+      int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : 
jsc.defaultParallelism();
       this.deltaGenerator = new DeltaGenerator(
           new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), 
DeltaInputType.valueOf(cfg.inputFormatName),
               new SerializableConfiguration(jsc.hadoopConfiguration()), 
cfg.inputBasePath, cfg.targetBasePath,
-              schemaStr, cfg.limitFileSize),
+              schemaStr, cfg.limitFileSize, inputParallelism, 
cfg.deleteOldInput),
           jsc, sparkSession, schemaStr, keyGenerator);
       log.info(String.format("Initialized writerContext with: %s", schemaStr));
     } catch (Exception e) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 8dc7f4b..dc991b1 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -28,9 +28,17 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
 import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
 import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -41,7 +49,6 @@ import 
org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
-import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -58,7 +65,7 @@ public class DeltaGenerator implements Serializable {
 
   private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
 
-  private DeltaConfig deltaOutputConfig;
+  private DFSDeltaConfig deltaOutputConfig;
   private transient JavaSparkContext jsc;
   private transient SparkSession sparkSession;
   private String schemaStr;
@@ -66,7 +73,7 @@ public class DeltaGenerator implements Serializable {
   private List<String> partitionPathFieldNames;
   private int batchId;
 
-  public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, 
SparkSession sparkSession,
+  public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext 
jsc, SparkSession sparkSession,
                         String schemaStr, BuiltinKeyGenerator keyGenerator) {
     this.deltaOutputConfig = deltaOutputConfig;
     this.jsc = jsc;
@@ -77,6 +84,16 @@ public class DeltaGenerator implements Serializable {
   }
 
   public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) 
{
+    if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
+      Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), 
Integer.toString(batchId - 1));
+      try {
+        FileSystem fs = FSUtils.getFs(oldInputDir.toString(), 
deltaOutputConfig.getConfiguration());
+        fs.delete(oldInputDir, true);
+      } catch (IOException e) {
+        log.error("Failed to delete older input data direcory " + oldInputDir, 
e);
+      }
+    }
+
     // The following creates a new anonymous function for iterator and hence 
results in serialization issues
     JavaRDD<DeltaWriteStats> ws = records.mapPartitions(itr -> {
       try {
@@ -95,11 +112,22 @@ public class DeltaGenerator implements Serializable {
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();
+
+    // Each spark partition below will generate records for a single partition 
given by the integer index.
+    List<Integer> partitionIndexes = IntStream.rangeClosed(0 + startPartition, 
numPartitions + startPartition)
+        .boxed().collect(Collectors.toList());
+
+    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, 
numPartitions)
+        .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new 
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
-            minPayloadSize, schemaStr, partitionPathFieldNames, 
numPartitions));
-        });
+            minPayloadSize, schemaStr, partitionPathFieldNames, 
(Integer)index));
+        }, true);
+
+    if (deltaOutputConfig.getInputParallelism() < numPartitions) {
+      inputBatch = 
inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
+    }
+
     return inputBatch;
   }
 
@@ -131,9 +159,11 @@ public class DeltaGenerator implements Serializable {
           }
         }
 
-        log.info("Repartitioning records");
         // persist this since we will make multiple passes over this
-        adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
+        int numPartition = Math.min(deltaOutputConfig.getInputParallelism(),
+            Math.max(1, config.getNumUpsertPartitions()));
+        log.info("Repartitioning records into " + numPartition + " 
partitions");
+        adjustedRDD = adjustedRDD.repartition(numPartition);
         log.info("Repartitioning records done");
         UpdateConverter converter = new UpdateConverter(schemaStr, 
config.getRecordSize(),
             partitionPathFieldNames, recordRowKeyFieldNames);
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 512118f..270dcd1 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -18,8 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.generator;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
@@ -37,18 +40,18 @@ public class FlexibleSchemaRecordGenerationIterator 
implements Iterator<GenericR
   // Store last record for the partition path of the first payload to be used 
for all subsequent generated payloads
   private GenericRecord lastRecord;
   // Partition path field name
-  private List<String> partitionPathFieldNames;
+  private Set<String> partitionPathFieldNames;
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, 
String schema) {
-    this(maxEntriesToProduce, 
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 
GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS);
+    this(maxEntriesToProduce, 
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
   }
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int 
minPayloadSize, String schemaStr,
-      List<String> partitionPathFieldNames, int numPartitions) {
+      List<String> partitionPathFieldNames, int partitionIndex) {
     this.counter = maxEntriesToProduce;
-    this.partitionPathFieldNames = partitionPathFieldNames;
+    this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
     Schema schema = new Schema.Parser().parse(schemaStr);
-    this.generator = new GenericRecordFullPayloadGenerator(schema, 
minPayloadSize, numPartitions);
+    this.generator = new GenericRecordFullPayloadGenerator(schema, 
minPayloadSize, partitionIndex);
   }
 
   @Override
@@ -60,7 +63,7 @@ public class FlexibleSchemaRecordGenerationIterator 
implements Iterator<GenericR
   public GenericRecord next() {
     this.counter--;
     if (lastRecord == null) {
-      GenericRecord record = this.generator.getNewPayload();
+      GenericRecord record = 
this.generator.getNewPayload(partitionPathFieldNames);
       lastRecord = record;
       return record;
     } else {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index df9a449..f61fad6 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -22,10 +22,12 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -47,14 +49,13 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
   private static Logger LOG = 
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
 
   public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
-  public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
   protected final Random random = new Random();
   // The source schema used to generate a payload
   private final transient Schema baseSchema;
   // Used to validate a generic record
   private final transient GenericData genericData = new GenericData();
-  // The number of unique dates to create
-  private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
+  // The index of partition for which records are being generated
+  private int partitionIndex = 0;
   // The size of a full record where every field of a generic record created 
contains 1 random value
   private final int estimatedFullPayloadSize;
   // Number of extra entries to add in a complex/collection field to achieve 
the desired record size
@@ -89,9 +90,9 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     }
   }
 
-  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, 
int numDatePartitions) {
+  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, 
int partitionIndex) {
     this(schema, minPayloadSize);
-    this.numDatePartitions = numDatePartitions;
+    this.partitionIndex = partitionIndex;
   }
 
   protected static boolean isPrimitive(Schema localSchema) {
@@ -115,7 +116,50 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
   }
 
   protected GenericRecord getNewPayload(Schema schema) {
-    return randomize(new GenericData.Record(schema), null);
+    return create(schema, null);
+  }
+
+  /**
+   * Create a new {@link GenericRecord} with random value according to given 
schema.
+   *
+   * Long fields which are specified within partitionPathFieldNames are 
constrained to the value of the partition
+   * for which records are being generated.
+   *
+   * @return {@link GenericRecord} with random value
+   */
+  public GenericRecord getNewPayload(Set<String> partitionPathFieldNames) {
+    return create(baseSchema, partitionPathFieldNames);
+  }
+
+  protected GenericRecord create(Schema schema, Set<String> 
partitionPathFieldNames) {
+    GenericRecord result = new GenericData.Record(schema);
+    for (Schema.Field f : schema.getFields()) {
+      if (isPartialLongField(f, partitionPathFieldNames)) {
+        // This is a long field used as partition field. Set it to seconds 
since epoch.
+        long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
+        result.put(f.name(), (long)value);
+      } else {
+        result.put(f.name(), typeConvert(f));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return true if this is a partition field of type long which should be set 
to the partition index.
+   * @return
+   */
+  private boolean isPartialLongField(Schema.Field field, Set<String> 
partitionPathFieldNames) {
+    if ((partitionPathFieldNames == null) || 
!partitionPathFieldNames.contains(field.name())) {
+      return false;
+    }
+
+    Schema fieldSchema = field.schema();
+    if (isOption(fieldSchema)) {
+      fieldSchema = getNonNull(fieldSchema);
+    }
+
+    return fieldSchema.getType() == org.apache.avro.Schema.Type.LONG;
   }
 
   /**
@@ -125,7 +169,7 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
    * @param blacklistFields Fields whose value should not be touched
    * @return The updated {@link GenericRecord}
    */
-  public GenericRecord getUpdatePayload(GenericRecord record, List<String> 
blacklistFields) {
+  public GenericRecord getUpdatePayload(GenericRecord record, Set<String> 
blacklistFields) {
     return randomize(record, blacklistFields);
   }
 
@@ -158,7 +202,7 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
    * @param blacklistFields blacklistFields where the filed will not be 
randomized.
    * @return Randomized GenericRecord.
    */
-  protected GenericRecord randomize(GenericRecord record, List<String> 
blacklistFields) {
+  protected GenericRecord randomize(GenericRecord record, Set<String> 
blacklistFields) {
     for (Schema.Field f : record.getSchema().getFields()) {
       if (blacklistFields == null || !blacklistFields.contains(f.name())) {
         record.put(f.name(), typeConvert(f));
@@ -167,12 +211,6 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
     return record;
   }
 
-  private long getNextConstrainedLong() {
-    int numPartitions = random.nextInt(numDatePartitions);
-    long unixTimeStamp = TimeUnit.SECONDS.convert(numPartitions, 
TimeUnit.DAYS);
-    return unixTimeStamp;
-  }
-
   /**
    * Generate random value according to their type.
    */
@@ -191,7 +229,7 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
       case INT:
         return random.nextInt();
       case LONG:
-        return getNextConstrainedLong();
+        return random.nextLong();
       case STRING:
        return UUID.randomUUID().toString();
       case ENUM:
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
index a33ef0c..d9d137a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.generator;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
@@ -31,14 +33,14 @@ public class UpdateGeneratorIterator implements 
Iterator<GenericRecord> {
 
   // Use the full payload generator as default
   private GenericRecordFullPayloadGenerator generator;
-  private List<String> blackListedFields;
+  private Set<String> blackListedFields;
   // iterator
   private Iterator<GenericRecord> itr;
 
   public UpdateGeneratorIterator(Iterator<GenericRecord> itr, String 
schemaStr, List<String> partitionPathFieldNames,
       List<String> recordKeyFieldNames, int minPayloadSize) {
     this.itr = itr;
-    this.blackListedFields = new ArrayList<>();
+    this.blackListedFields = new HashSet<>();
     this.blackListedFields.addAll(partitionPathFieldNames);
     this.blackListedFields.addAll(recordKeyFieldNames);
     Schema schema = new Schema.Parser().parse(schemaStr);
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index e209118..cfe7991 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -36,6 +36,7 @@ import java.util.stream.StreamSupport;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
@@ -78,8 +79,10 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
   }
 
   protected List<String> getPartitions(Option<Integer> partitionsLimit) throws 
IOException {
-    List<String> partitionPaths = FSUtils
-        .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), 
false);
+    // Using FSUtils.getFS here instead of metaClient.getFS() since we dont 
want to count these listStatus
+    // calls in metrics as they are not part of normal HUDI operation.
+    FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), 
metaClient.getHadoopConf());
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, 
metaClient.getBasePath(), false);
     // Sort partition so we can pick last N partitions by default
     Collections.sort(partitionPaths);
     if (!partitionPaths.isEmpty()) {
@@ -136,6 +139,9 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
     // Read all file slices in the partition
     JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice = 
getPartitionToFileSlice(metaClient,
         partitionPaths);
+    Map<String, Integer> partitionToFileIdCountMap = partitionToFileSlice
+        .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap();
+
     // TODO : read record count from metadata
     // Read the records in a single file
     long recordsInSingleFile = 
iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice)));
@@ -144,7 +150,11 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
     if (!numFiles.isPresent() || numFiles.get() == 0) {
       // If num files are not passed, find the number of files to update based 
on total records to update and records
       // per file
-      numFilesToUpdate = (int) (numRecordsToUpdate.get() / 
recordsInSingleFile);
+      numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / 
recordsInSingleFile);
+      // recordsInSingleFile is not average so we still need to account for 
bias is records distribution
+      // in the files. Limit to the maximum number of files available.
+      int totalExistingFilesCount = 
partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
+      numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
       log.info("Files to update {}", numFilesToUpdate);
       numRecordsToUpdatePerFile = recordsInSingleFile;
     } else {
@@ -154,9 +164,10 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
       numRecordsToUpdatePerFile = percentageRecordsPerFile.isPresent() ? 
(long) (recordsInSingleFile
           * percentageRecordsPerFile.get()) : numRecordsToUpdate.get() / 
numFilesToUpdate;
     }
+
     // Adjust the number of files to read per partition based on the requested 
partition & file counts
     Map<String, Integer> adjustedPartitionToFileIdCountMap = 
getFilesToReadPerPartition(partitionToFileSlice,
-        partitionPaths.size(), numFilesToUpdate);
+        partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap);
     JavaRDD<GenericRecord> updates = 
projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
         partitionToFileSlice, numFilesToUpdate, (int) 
numRecordsToUpdatePerFile));
     if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && 
numFiles.get() != 0 && numRecordsToUpdate.get()
@@ -190,10 +201,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
   }
 
   private Map<String, Integer> getFilesToReadPerPartition(JavaPairRDD<String, 
Iterator<FileSlice>>
-      partitionToFileSlice, Integer numPartitions, Integer numFiles) {
-    int numFilesPerPartition = (int) Math.ceil(numFiles / numPartitions);
-    Map<String, Integer> partitionToFileIdCountMap = partitionToFileSlice
-        .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap();
+      partitionToFileSlice, Integer numPartitions, Integer numFiles, 
Map<String, Integer> partitionToFileIdCountMap) {
     long totalExistingFilesCount = 
partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
     ValidationUtils.checkArgument(totalExistingFilesCount >= numFiles, "Cannot 
generate updates "
         + "for more files than present in the dataset, file requested " + 
numFiles + ", files present "
@@ -204,7 +212,9 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
         .sorted(comparingByValue())
         .collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2,
             LinkedHashMap::new));
+
     // Limit files to be read per partition
+    int numFilesPerPartition = (int) Math.ceil((double)numFiles / 
numPartitions);
     Map<String, Integer> adjustedPartitionToFileIdCountMap = new HashMap<>();
     partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> {
       if (e.getValue() <= numFilesPerPartition) {
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index ff41b44..ff92bd0 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
   public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws 
IOException {
     DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, 
DeltaInputType.AVRO,
         new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, 
dfsBasePath,
-        schemaProvider.getSourceSchema().toString(), 10240L);
+        schemaProvider.getSourceSchema().toString(), 10240L, 
jsc.defaultParallelism(), false);
     DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = 
DeltaWriterFactory
         .getDeltaWriterAdapter(dfsSinkConfig, 1);
     FlexibleSchemaRecordGenerationIterator itr = new 
FlexibleSchemaRecordGenerationIterator(1000,
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
index 7524d4a..9451595 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
@@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.IntStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -92,7 +94,8 @@ public class TestGenericRecordPayloadGenerator {
       insertRowKeys.add(record.get("_row_key").toString());
       insertTimeStamps.add((Long) record.get("timestamp"));
     });
-    List<String> blacklistFields = Arrays.asList("_row_key");
+    Set<String> blacklistFields = new HashSet<>();
+    blacklistFields.add("_row_key");
     records.stream().forEach(a -> {
       // Generate 10 updated records
       GenericRecord record = payloadGenerator.getUpdatePayload(a, 
blacklistFields);

Reply via email to