This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ed0462b49ada72f1c30ef3fb0bba09f14c94f231
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 05:44:06 2024 -0700

    [HUDI-7673] Fixing false positive validation failure for RLI with MDT 
validation tool (#11098)
---
 .../utilities/HoodieMetadataTableValidator.java    | 118 ++++++++++++++-------
 .../TestHoodieMetadataTableValidator.java          | 118 ++++++++++++++++++++-
 2 files changed, 195 insertions(+), 41 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 62a42e56964..0ec37e4a8fa 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities;
 
+import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.async.HoodieAsyncService;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -37,7 +38,6 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -67,6 +67,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.utilities.util.BloomFilterData;
 
 import com.beust.jcommander.JCommander;
@@ -77,6 +78,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.functions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,6 +103,10 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 
@@ -540,7 +546,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           }).collectAsList());
 
       try {
-        validateRecordIndex(engineContext, metaClient, 
metadataTableBasedContext.getTableMetadata());
+        validateRecordIndex(engineContext, metaClient);
         result.add(Pair.of(true, null));
       } catch (HoodieValidationException e) {
         LOG.error(
@@ -638,7 +644,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
             Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
             if (lastInstant.isPresent()
-                && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), 
HoodieTimeline.GREATER_THAN, lastInstant.get().getTimestamp())) {
+                && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, 
lastInstant.get().getTimestamp())) {
               LOG.warn("Ignoring additional partition {}, as it was deduced to 
be part of a "
                   + "latest completed commit which was inflight when FS based 
listing was polled.", partitionFromDMT);
               actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
@@ -886,10 +892,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   }
 
   private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
-                                   HoodieTableMetaClient metaClient,
-                                   HoodieTableMetadata tableMetadata) {
+                                   HoodieTableMetaClient metaClient) {
+    if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
+      return;
+    }
     if (cfg.validateRecordIndexContent) {
-      validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+      validateRecordIndexContent(sparkEngineContext, metaClient);
     } else if (cfg.validateRecordIndexCount) {
       validateRecordIndexCount(sparkEngineContext, metaClient);
     }
@@ -898,11 +906,15 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
                                         HoodieTableMetaClient metaClient) {
     String basePath = metaClient.getBasePathV2().toString();
+    String latestCompletedCommit = 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline()
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
     long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),latestCompletedCommit)
         .load(basePath)
-        .select(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+        .select(RECORD_KEY_METADATA_FIELD)
         .count();
     long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),latestCompletedCommit)
         .load(getMetadataTableBasePath(basePath))
         .select("key")
         .filter("type = 5")
@@ -919,43 +931,15 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   }
 
   private void validateRecordIndexContent(HoodieSparkEngineContext 
sparkEngineContext,
-                                          HoodieTableMetaClient metaClient,
-                                          HoodieTableMetadata tableMetadata) {
+                                          HoodieTableMetaClient metaClient) {
     String basePath = metaClient.getBasePathV2().toString();
+    String latestCompletedCommit = 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline()
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
     JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
-        sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
-            .select(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieRecord.FILENAME_METADATA_FIELD)
-            .toJavaRDD()
-            .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)),
-                
Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)),
-                    
FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD))))))
-            .cache();
+        getRecordLocationsFromFSBasedListing(sparkEngineContext, basePath, 
latestCompletedCommit);
 
     JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
-        sparkEngineContext.getSqlContext().read().format("hudi")
-            .load(getMetadataTableBasePath(basePath))
-            .filter("type = 5")
-            .select(functions.col("key"),
-                
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
-                
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
-                
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
-                functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
-                functions.col("recordIndexMetadata.fileId").as("fileId"),
-                
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
-                
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
-            .toJavaRDD()
-            .mapToPair(row -> {
-              HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
-                  row.getString(row.fieldIndex("partitionName")),
-                  row.getInt(row.fieldIndex("fileIdEncoding")),
-                  row.getLong(row.fieldIndex("fileIdHighBits")),
-                  row.getLong(row.fieldIndex("fileIdLowBits")),
-                  row.getInt(row.fieldIndex("fileIndex")),
-                  row.getString(row.fieldIndex("fileId")),
-                  row.getLong(row.fieldIndex("instantTime")));
-              return new Tuple2<>(row.getString(row.fieldIndex("key")),
-                  Pair.of(location.getPartitionPath(), location.getFileId()));
-            });
+        getRecordLocationsFromRLI(sparkEngineContext, basePath, 
latestCompletedCommit);
 
     int numErrorSamples = cfg.numRecordIndexErrorSamples;
     Pair<Long, List<String>> result = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
@@ -1032,6 +1016,60 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
   }
 
+  @VisibleForTesting
+  JavaPairRDD<String, Pair<String, String>> 
getRecordLocationsFromFSBasedListing(HoodieSparkEngineContext 
sparkEngineContext,
+                                                                               
                       String basePath,
+                                                                               
                       String latestCompletedCommit) {
+    return sparkEngineContext.getSqlContext().read().format("hudi")
+        .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), 
latestCompletedCommit)
+        .load(basePath)
+        .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+        .toJavaRDD()
+        .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+            
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+                
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+        .cache();
+  }
+
+  @VisibleForTesting
+  JavaPairRDD<String, Pair<String, String>> 
getRecordLocationsFromRLI(HoodieSparkEngineContext sparkEngineContext,
+                                                                      String 
basePath,
+                                                                      String 
latestCompletedCommit) {
+    return sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(getMetadataTableBasePath(basePath))
+        .filter("type = 5")
+        .select(functions.col("key"),
+            
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+            
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+            
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+            functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+            functions.col("recordIndexMetadata.fileId").as("fileId"),
+            functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+            
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+        .toJavaRDD()
+        .map(row -> {
+          HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+              row.getString(row.fieldIndex("partitionName")),
+              row.getInt(row.fieldIndex("fileIdEncoding")),
+              row.getLong(row.fieldIndex("fileIdHighBits")),
+              row.getLong(row.fieldIndex("fileIdLowBits")),
+              row.getInt(row.fieldIndex("fileIndex")),
+              row.getString(row.fieldIndex("fileId")),
+              row.getLong(row.fieldIndex("instantTime")));
+          // handle false positive case. a commit was pending when FS based 
locations were fetched, but committed when MDT was polled.
+          if (HoodieTimeline.compareTimestamps(location.getInstantTime(), 
GREATER_THAN, latestCompletedCommit)) {
+            return new Tuple2<>(row, Option.empty());
+          } else {
+            return new Tuple2<>(row, Option.of(location));
+          }
+        }).filter(tuple2 -> tuple2._2.isPresent()) // filter the false 
positives
+        .mapToPair(tuple2 -> {
+          Tuple2<Row, Option<HoodieRecordGlobalLocation>> rowAndLocation = 
(Tuple2<Row, Option<HoodieRecordGlobalLocation>>) tuple2;
+          return new 
Tuple2<>(rowAndLocation._1.getString(rowAndLocation._1.fieldIndex("key")),
+              Pair.of(rowAndLocation._2.get().getPartitionPath(), 
rowAndLocation._2.get().getFileId()));
+        }).cache();
+  }
+
   private String constructLocationInfoString(String recordKey, 
Optional<Pair<String, String>> locationOnFs,
                                              Optional<Pair<String, String>> 
locationFromRecordIndex) {
     StringBuilder sb = new StringBuilder();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index dd6ee4730ba..a9af0146db1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -27,10 +28,16 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 
+import jodd.io.FileUtil;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -59,7 +66,6 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
   @Test
   public void testMetadataTableValidation() {
-
     Map<String, String> writeOptions = new HashMap<>();
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put("hoodie.table.name", "test_table");
@@ -71,11 +77,17 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     Dataset<Row> inserts = makeInsertDf("000", 5).cache();
     inserts.write().format("hudi").options(writeOptions)
         .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1")
         .mode(SaveMode.Overwrite)
         .save(basePath);
     Dataset<Row> updates = makeUpdateDf("001", 5).cache();
     updates.write().format("hudi").options(writeOptions)
         .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
+        .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1")
         .mode(SaveMode.Append)
         .save(basePath);
 
@@ -196,6 +208,110 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     }
   }
 
+  @Test
+  public void testRliValidationFalsePositiveCase() throws IOException {
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");
+
+    Dataset<Row> inserts = makeInsertDf("000", 5).cache();
+    inserts.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1")
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+    Dataset<Row> updates = makeUpdateDf("001", 5).cache();
+    updates.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
+        .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1")
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    Dataset<Row> inserts2 = makeInsertDf("002", 5).cache();
+    inserts2.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1")
+        
.option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1")
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    // validate MDT
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = "file://" + basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+
+    // lets ensure we have a pending commit when FS based polling is done. and 
the commit completes when MDT is polled.
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())).build();
+    // moving out the completed commit meta file to a temp location
+    HoodieInstant lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get();
+    String latestCompletedCommitMetaFile = basePath + "/.hoodie/" + 
lastInstant.getFileName();
+    String tempDir = getTempLocation();
+    String destFilePath = tempDir + "/" + lastInstant.getFileName();
+    FileUtil.move(latestCompletedCommitMetaFile, destFilePath);
+
+    MockHoodieMetadataTableValidatorForRli validator = new 
MockHoodieMetadataTableValidatorForRli(jsc, config);
+    validator.setOriginalFilePath(latestCompletedCommitMetaFile);
+    validator.setDestFilePath(destFilePath);
+    assertTrue(validator.run());
+    assertFalse(validator.hasValidationFailure());
+    assertTrue(validator.getThrowables().isEmpty());
+  }
+
+  /**
+   * Class to assist with testing a false positive case with RLI validation.
+   */
+  static class MockHoodieMetadataTableValidatorForRli extends 
HoodieMetadataTableValidator {
+
+    private String destFilePath;
+    private String originalFilePath;
+
+    public MockHoodieMetadataTableValidatorForRli(JavaSparkContext jsc, Config 
cfg) {
+      super(jsc, cfg);
+    }
+
+    @Override
+    JavaPairRDD<String, Pair<String, String>> 
getRecordLocationsFromRLI(HoodieSparkEngineContext sparkEngineContext,
+                                                                        String 
basePath,
+                                                                        String 
latestCompletedCommit) {
+      // move the completed file back to ".hoodie" to simuate the false 
positive case.
+      try {
+        FileUtil.move(destFilePath, originalFilePath);
+        return super.getRecordLocationsFromRLI(sparkEngineContext, basePath, 
latestCompletedCommit);
+      } catch (IOException e) {
+        throw new HoodieException("Move should not have failed");
+      }
+    }
+
+    public void setDestFilePath(String destFilePath) {
+      this.destFilePath = destFilePath;
+    }
+
+    public void setOriginalFilePath(String originalFilePath) {
+      this.originalFilePath = originalFilePath;
+    }
+  }
+
+  private String getTempLocation() {
+    try {
+      String folderName = "temp_location";
+      java.nio.file.Path tempPath = tempDir.resolve(folderName);
+      java.nio.file.Files.createDirectories(tempPath);
+      return tempPath.toAbsolutePath().toString();
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
   protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
     List<String> records = dataGen.generateInserts(instantTime, n).stream()
         .map(r -> recordToString(r).get()).collect(Collectors.toList());

Reply via email to