umehrot2 commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r463889895



##########
File path: hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.RecordDataBootstrapInputProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.selector.RecordDataBootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.api.java.UDF1;
+import org.apache.spark.sql.types.DataTypes;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Random;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.spark.sql.functions.callUDF;
+
+/**
+ * Tests Bootstrap Client functionality.
+ */
+public class TestBootstrap extends HoodieClientTestBase {
+
+  //FIXME(bootstrap): why is this test so darn slow?
+
+  public static final String TRIP_HIVE_COLUMN_TYPES = 
"double,string,string,string,double,double,double,double,"
+      + 
"struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+
+  @TempDir
+  public java.nio.file.Path tmpFolder;
+
+  protected String srcPath = null;
+
+  private HoodieParquetInputFormat roInputFormat;
+  private JobConf roJobConf;
+
+  private HoodieParquetRealtimeInputFormat rtInputFormat;
+  private JobConf rtJobConf;
+  private SparkSession spark;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    srcPath = tmpFolder.toAbsolutePath().toString() + "/data";
+    initPath();
+    spark = SparkSession.builder()
+        .appName("Bootstrap test")
+        .master("local[2]")
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+    sqlContext = spark.sqlContext();
+    hadoopConf = spark.sparkContext().hadoopConfiguration();
+    initTestDataGenerator();
+    initMetaClient();
+    // initialize parquet input format
+    reloadInputFormats();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupClients();
+    cleanupTestDataGenerator();
+  }
+
+  private void reloadInputFormats() {
+    // initialize parquet input format
+    roInputFormat = new HoodieParquetInputFormat();
+    roJobConf = new JobConf(jsc.hadoopConfiguration());
+    roInputFormat.setConf(roJobConf);
+
+    rtInputFormat = new HoodieParquetRealtimeInputFormat();
+    rtJobConf = new JobConf(jsc.hadoopConfiguration());
+    rtInputFormat.setConf(rtJobConf);
+  }
+
+  public Schema generateNewDataSetAndReturnSchema(double timestamp, int 
numRecords, List<String> partitionPaths,
+      String srcPath) throws Exception {
+    boolean isPartitioned = partitionPaths != null && 
!partitionPaths.isEmpty();
+    Dataset<Row> df = generateTestRawTripDataset(timestamp, numRecords, 
partitionPaths, jsc, sqlContext);
+    df.printSchema();
+    if (isPartitioned) {
+      
df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    } else {
+      df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    }
+    String filePath = 
FileStatusUtils.toPath(FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), 
srcPath,
+        (status) -> 
status.getName().endsWith(".parquet")).stream().findAny().map(p -> 
p.getValue().stream().findAny())
+        .orElse(null).get().getPath()).toString();
+    ParquetFileReader reader = 
ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath));
+    MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+    return new AvroSchemaConverter().convert(schema);
+  }
+
+  @Test
+  public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
+    testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  private enum EffectiveMode {
+    FULL_BOOTSTRAP_MODE,
+    METADATA_BOOTSTRAP_MODE,
+    MIXED_BOOTSTRAP_MODE
+  }
+
+  private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, 
EffectiveMode mode) throws Exception {
+    if (deltaCommit) {
+      metaClient = HoodieTestUtils.init(basePath, 
HoodieTableType.MERGE_ON_READ);
+    }
+    int totalRecords = 100;
+    String keyGeneratorClass = partitioned ? 
SimpleKeyGenerator.class.getCanonicalName()
+        : NonpartitionedKeyGenerator.class.getCanonicalName();
+    final String bootstrapModeSelectorClass;
+    final String bootstrapCommitInstantTs;
+    final boolean checkNumRawFiles;
+    final boolean isBootstrapIndexCreated;
+    final int numInstantsAfterBootstrap;
+    final List<String> bootstrapInstants;
+    switch (mode) {
+      case FULL_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = 
RecordDataBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = false;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      case METADATA_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = 
RecordMetadataOnlyBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = true;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      default:
+        bootstrapModeSelectorClass = 
TestRandomBootstapModeSelector.class.getName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 2;
+        bootstrapInstants = 
Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+            HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+        break;
+    }
+    List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", 
"2020/04/03");
+    double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, 
partitions, srcPath);
+    HoodieWriteConfig config = getConfigBuilder(schema.toString())
+        .withAutoCommit(true)
+        .withSchema(schema.toString())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .build())
+        .withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
+            .withBootstrapSourceBasePath(srcPath)
+            .withBootstrapKeyGenClass(keyGeneratorClass)
+            
.withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
+            .withBootstrapParallelism(3)
+            .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
+        .build();
+    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, 
checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, 
bootstrapInstants);
+
+    // Rollback Bootstrap
+    FSUtils.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), 
new HoodieInstant(State.COMPLETED,
+        deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : 
HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
+    client.rollBackInflightBootstrap();
+    metaClient.reloadActiveTimeline();
+    assertEquals(0, metaClient.getCommitsTimeline().countInstants());
+    assertEquals(0L, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), 
basePath,
+        (status) -> status.getName().endsWith(".parquet")).stream().flatMap(f 
-> f.getValue().stream()).count());
+
+    BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
+    assertFalse(index.isIndexAvailable());
+
+    // Run bootstrap again
+    client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+
+    metaClient.reloadActiveTimeline();
+    index = BootstrapIndex.getBootstrapIndex(metaClient);
+    if (isBootstrapIndexCreated) {
+      assertTrue(index.isIndexAvailable());
+    } else {
+      assertFalse(index.isIndexAvailable());
+    }
+
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, 
checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, 
bootstrapInstants);
+
+    // Upsert case
+    double updateTimestamp = new 
Double(Instant.now().toEpochMilli()).longValue();
+    String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+    generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, 
partitions, updateSPath);
+    JavaRDD<HoodieRecord> updateBatch =
+        generateInputBatch(jsc, 
FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath,
+            (status) -> status.getName().endsWith("parquet")), schema);
+    String newInstantTs = client.startCommit();
+    client.upsert(updateBatch, newInstantTs);
+    checkBootstrapResults(totalRecords, schema, newInstantTs, false, 
numInstantsAfterBootstrap + 1,
+        updateTimestamp, deltaCommit ? timestamp : updateTimestamp, 
deltaCommit);
+
+    if (deltaCommit) {
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent());
+      client.compact(compactionInstant.get());
+      checkBootstrapResults(totalRecords, schema, compactionInstant.get(), 
checkNumRawFiles,
+          numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, 
!deltaCommit,
+          Arrays.asList(compactionInstant.get()));
+    }
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBoostrapOnlyCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetaAndFullBoostrapCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String 
maxInstant, boolean checkNumRawFiles,
+      int expNumInstants, double expTimestamp, double expROTimestamp, boolean 
isDeltaCommit) throws Exception {
+    checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, 
expNumInstants, expNumInstants,
+        expTimestamp, expROTimestamp, isDeltaCommit, 
Arrays.asList(maxInstant));
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String 
instant, boolean checkNumRawFiles,
+      int expNumInstants, int numVersions, double expTimestamp, double 
expROTimestamp, boolean isDeltaCommit,
+      List<String> instantsWithValidRecords) throws Exception {
+    metaClient.reloadActiveTimeline();
+    assertEquals(expNumInstants, 
metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
+    assertEquals(instant, metaClient.getActiveTimeline()
+        
.getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+
+    Dataset<Row> bootstrapped = 
sqlContext.read().format("parquet").load(basePath);
+    Dataset<Row> original = sqlContext.read().format("parquet").load(srcPath);
+    bootstrapped.registerTempTable("bootstrapped");
+    original.registerTempTable("original");
+    if (checkNumRawFiles) {
+      List<HoodieFileStatus> files = 
FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+          (status) -> status.getName().endsWith(".parquet"))
+          .stream().flatMap(x -> 
x.getValue().stream()).collect(Collectors.toList());
+      assertEquals(files.size() * numVersions,
+          sqlContext.sql("select distinct _hoodie_file_name from 
bootstrapped").count());
+    }
+
+    if (!isDeltaCommit) {
+      String predicate = String.join(", ",
+          instantsWithValidRecords.stream().map(p -> "\"" + p + 
"\"").collect(Collectors.toList()));
+      assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped 
where _hoodie_commit_time IN "
+          + "(" + predicate + ")").count());
+      Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from 
original a where a._row_key not "
+          + "in (select _hoodie_record_key from bootstrapped)");
+      assertEquals(0, missingOriginal.count());
+      Dataset<Row> missingBootstrapped = sqlContext.sql("select 
a._hoodie_record_key from bootstrapped a "
+          + "where a._hoodie_record_key not in (select _row_key from 
original)");
+      assertEquals(0, missingBootstrapped.count());
+      //sqlContext.sql("select * from bootstrapped").show(10, false);
+    }
+
+    // RO Input Format Read
+    reloadInputFormats();
+    List<GenericRecord> records = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    Set<String> seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), 
r.get("_hoodie_record_key").toString(), "Record :" + r);
+      assertEquals(expROTimestamp, ((DoubleWritable)r.get("timestamp")).get(), 
0.1, "Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), 
r.get("_hoodie_record_key").toString(), "Realtime Record :" + r);
+      assertEquals(expTimestamp, 
((DoubleWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
+        true, HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, 
false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+  }
+
+  public static class FullTestBootstrapInputProvider extends 
RecordDataBootstrapInputProvider {
+
+    public FullTestBootstrapInputProvider(TypedProperties props, 
JavaSparkContext jsc) {
+      super(props, jsc);
+    }
+
+    @Override
+    public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, 
String sourceBasePath,
+        List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
+      String filePath = 
FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> 
p.getValue().stream())
+          .findAny().get().getPath()).toString();
+      ParquetFileReader reader = null;
+      try {
+        reader = ParquetFileReader.open(jsc.hadoopConfiguration(), new 
Path(filePath));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      MessageType parquetSchema = 
reader.getFooter().getFileMetaData().getSchema();
+      Schema schema =  new AvroSchemaConverter().convert(parquetSchema);
+      return generateInputBatch(jsc, partitionPaths, schema);
+    }
+  }
+
+  private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
+      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema 
writerSchema) {
+    List<Pair<String, Path>> fullFilePathsWithPartition = 
partitionPaths.stream().flatMap(p -> p.getValue().stream()
+        .map(x -> Pair.of(p.getKey(), 
FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+    return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
+      try {
+        Configuration conf = jsc.hadoopConfiguration();
+        AvroReadSupport.setAvroReadSchema(conf, writerSchema);
+        Iterator<GenericRecord> recIterator = new ParquetReaderIterator(
+            
AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build());
+        return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), 
false).map(gr -> {
+          try {
+            String key = gr.get("_row_key").toString();
+            String pPath = p.getKey();
+            return new HoodieRecord<>(new HoodieKey(key, pPath), new 
RawTripTestPayload(gr.toString(), key, pPath,
+                HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      } catch (IOException ioe) {
+        throw new HoodieIOException(ioe.getMessage(), ioe);
+      }
+    }).collect(Collectors.toList()));
+  }
+
+  public static class TestRandomBootstapModeSelector extends 
BootstrapModeSelector {
+
+    private int currIdx = new Random().nextInt(2);
+
+    public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public Map<BootstrapMode, List<String>> select(List<Pair<String, 
List<HoodieFileStatus>>> partitions) {
+      List<Pair<BootstrapMode, String>> selections = new ArrayList<>();
+      partitions.stream().forEach(p -> {
+        final BootstrapMode mode;
+        if (currIdx == 0) {
+          mode = BootstrapMode.RECORD_METADATA_ONLY_BOOTSTRAP;
+        } else {
+          mode = BootstrapMode.RECORD_DATA_BOOTSTRAP;
+        }
+        currIdx = (currIdx + 1) % 2;
+        selections.add(Pair.of(mode, p.getKey()));
+      });
+      return selections.stream().collect(Collectors.groupingBy(Pair::getKey, 
mapping(Pair::getValue, toList())));
+    }
+  }
+
+  public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, 
IndexType.BLOOM)
+        .withExternalSchemaTrasformation(true);
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"datestr");
+    builder = builder.withProps(properties);
+    return builder;
+  }
+
+  private static Dataset<Row> generateTestRawTripDataset(double timestamp, int 
numRecords, List<String> partitionPaths,

Review comment:
       Can you move this to a more common class, or make it public ? I am using 
this method to generate test data for all my test cases 
https://github.com/apache/hudi/pull/1702/files#diff-1ba0a227dedf7dfe4aa1b666df01f918




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to