satishkotha commented on a change in pull request #2635:
URL: https://github.com/apache/hudi/pull/2635#discussion_r588059204



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
##########
@@ -64,4 +80,246 @@ public void testGetFileWriter() throws IOException {
     }, "should fail since log storage writer is not supported yet.");
     assertTrue(thrown.getMessage().contains("format not supported yet."));
   }
+  
+  // key: full file path (/tmp/.../partition0000/file-000.parquet, value: 
column range 
+  @Test
+  public void testPerformanceRangeKeyPartitionFile() throws IOException {
+    final String instantTime = "100";
+    final HoodieWriteConfig cfg = getConfig();
+    final Path hfilePath = new Path(basePath + 
"/hfile_partition/f1_1-0-1_000.hfile");
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieHFileWriter<HoodieRecordPayload, IndexedRecord> hfileWriter = 
(HoodieHFileWriter<HoodieRecordPayload, IndexedRecord>) 
HoodieFileWriterFactory.getFileWriter(instantTime,
+        hfilePath, table, cfg, HoodieMetadataRecord.SCHEMA$, supplier);
+    Random random = new Random();
+    
+    int numPartitions = 1000;
+    int avgFilesPerPartition = 10;
+
+    long startTime = System.currentTimeMillis();
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < numPartitions; i++) {
+      String partitionPath = "partition-" + String.format("%010d", i);
+      partitions.add(partitionPath);
+      for (int j = 0; j < avgFilesPerPartition; j++) {
+        String filePath = "file-" + String.format("%010d", j) + 
"_1-0-1_000.parquet";
+        int max = random.nextInt();
+        if (max < 0) {
+          max = -max;
+        }
+        int min = random.nextInt(max);
+
+        HoodieKey key = new HoodieKey(partitionPath + filePath, partitionPath);
+        GenericRecord rec = new 
GenericData.Record(HoodieMetadataRecord.SCHEMA$);
+        rec.put("key", key.getRecordKey());
+        rec.put("type", 2);
+        rec.put("rangeIndexMetadata", 
HoodieRangeIndexInfo.newBuilder().setMax("" + max).setMin("" + 
min).setIsDeleted(false).build());
+        hfileWriter.writeAvro(key.getRecordKey(), rec);
+      }
+    }
+    
+    hfileWriter.close();
+    long durationInMs = System.currentTimeMillis() - startTime;
+    System.out.println("Time taken to generate & write: " + durationInMs + " 
ms. file path: " + hfilePath
+        + " File size: " + FSUtils.getFileSize(metaClient.getFs(), hfilePath));
+    
+    CacheConfig cacheConfig = new CacheConfig(hadoopConf);
+    cacheConfig.setCacheDataInL1(false);
+    HoodieHFileReader reader = new HoodieHFileReader(hadoopConf, hfilePath, 
cacheConfig);
+    long duration  = 0;
+    int numRuns = 1000;
+    long numRecordsInRange = 0;
+    for (int i = 0; i < numRuns; i++) {
+      int partitionPicked = Math.max(0, partitions.size() - 30);
+      long start = System.currentTimeMillis();
+      Map<String, GenericRecord> records = 
reader.getRecordsInRange(partitions.get(partitionPicked), 
partitions.get(partitions.size() - 1));
+      duration += (System.currentTimeMillis() - start);
+      numRecordsInRange += records.size();
+    }
+    double avgDuration = duration / (double) numRuns;
+    double avgRecordsFetched = numRecordsInRange / (double) numRuns;
+    System.out.println("Average time taken to lookup a range: " + avgDuration 
+ "ms. Avg number records: " + avgRecordsFetched);
+  }
+
+  // key: partition (partition0000), value: map (filePath -> column range)
+  @Test
+  public void testPerformanceRangeKeyPartition() throws IOException {
+    final String instantTime = "100";
+    final HoodieWriteConfig cfg = getConfig();
+    final Path hfilePath = new Path(basePath + 
"/hfile_partition/f1_1-0-1_000.hfile");
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieHFileWriter<HoodieRecordPayload, IndexedRecord> hfileWriter = 
(HoodieHFileWriter<HoodieRecordPayload, IndexedRecord>) 
HoodieFileWriterFactory.getFileWriter(instantTime,
+        hfilePath, table, cfg, HoodieMetadataRecord.SCHEMA$, supplier);
+    Random random = new Random();
+
+    int numPartitions = 1;
+    int avgFilesPerPartition = 10;
+
+    long startTime = System.currentTimeMillis();
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < numPartitions; i++) {
+      String partitionPath = "partition-" + String.format("%010d", i);
+      partitions.add(partitionPath);
+      Map<String, HoodieRangeIndexInfo> fileToRangeInfo = new HashMap<>();
+      for (int j = 0; j < avgFilesPerPartition; j++) {
+        String filePath = "file-" + String.format("%010d", j) + 
"_1-0-1_000.parquet";
+        int max = random.nextInt();
+        if (max < 0) {
+          max = -max;
+        }
+        int min = random.nextInt(max);
+        fileToRangeInfo.put(filePath, 
HoodieRangeIndexInfo.newBuilder().setMax("" + max).setMin("" + 
min).setIsDeleted(false).build());
+      }
+
+      HoodieKey key = new HoodieKey(partitionPath, partitionPath);
+      GenericRecord rec = new GenericData.Record(HoodieMetadataRecord.SCHEMA$);
+      rec.put("key", key.getRecordKey());
+      rec.put("type", 2);
+      rec.put("partitionRangeIndexMetadata", fileToRangeInfo);
+      hfileWriter.writeAvro(key.getRecordKey(), rec);
+    }
+
+    hfileWriter.close();
+    long durationInMs = System.currentTimeMillis() - startTime;
+    System.out.println("Time taken to generate & write: " + durationInMs + " 
ms. file path: " + hfilePath
+        + " File size: " + FSUtils.getFileSize(metaClient.getFs(), hfilePath));
+    
+    CacheConfig cacheConfig = new CacheConfig(hadoopConf);
+    cacheConfig.setCacheDataInL1(false);
+    HoodieHFileReader reader = new HoodieHFileReader(hadoopConf, hfilePath, 
cacheConfig);
+    GenericRecord record = (GenericRecord) 
reader.getRecordByKey(partitions.get(0), reader.getSchema()).get();
+    assertEquals(avgFilesPerPartition, ((Map<String, HoodieRangeIndexInfo> 
)record.get("partitionRangeIndexMetadata")).size());
+    long duration  = 0;
+    int numRuns = 1000;
+    long numRecordsInRange = 0;
+    for (int i = 0; i < numRuns; i++) {
+      int partitionPicked = Math.max(0, partitions.size() - 30);
+      long start = System.currentTimeMillis();
+      Map<String, GenericRecord> records = (Map<String, GenericRecord>) 
+          reader.getRecordsInRange(partitions.get(partitionPicked), 
partitions.get(partitions.size() - 1));
+      numRecordsInRange += records.size();
+      duration += (System.currentTimeMillis() - start);
+    }
+    double avgDuration = duration / (double) numRuns;
+    double avgRecordsFetched = numRecordsInRange / (double) numRuns;
+    System.out.println("Average time taken to lookup a range: " + avgDuration 
+ "ms. Avg number records: " + avgRecordsFetched);
+  }
+
+  // key: column range (400), value: list (filePaths containing that range)
+  @Test
+  public void testPerformanceRangeKeyColumnRange() throws IOException {
+    final String instantTime = "100";
+    final HoodieWriteConfig cfg = getConfig();
+    final Path hfilePath = new Path(basePath + 
"/hfile_partition/f1_1-0-1_000.hfile");
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieHFileWriter<HoodieRecordPayload, IndexedRecord> hfileWriter = 
(HoodieHFileWriter<HoodieRecordPayload, IndexedRecord>) 
HoodieFileWriterFactory.getFileWriter(instantTime,
+        hfilePath, table, cfg, HoodieMetadataRecord.SCHEMA$, supplier);
+    Random random = new Random();
+
+    int numKeys = 300;

Review comment:
       @vinothchandar @prashantwason PTAL at the test code here for perf 
measurement and let me know if you have suggestions on layouts to try.  2 Tests 
above this are similar too (minor changes to layout and record type used)
   
   Also, may be look at new method i added HoodieHFileReader#getRecordsInRange 
and let me know if you see any obvious performance improvements. (Functionally, 
it may miss few records at the beginning in some cases. i'll fix that part)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to