jonvex commented on code in PR #9422:
URL: https://github.com/apache/hudi/pull/9422#discussion_r1304369713


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static org.apache.spark.sql.SaveMode.Append;
+import static org.apache.spark.sql.SaveMode.Overwrite;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test mor with colstats enabled in scenarios to ensure that files
+ * are being appropriately read or not read.
+ * The strategy employed is to corrupt targeted base files. If we want
+ * to prove the file is read, we assert that an exception will be thrown.
+ * If we want to prove the file is not read, we expect the read to
+ * successfully execute.
+ */
+public class TestMORColstats extends HoodieSparkClientTestBase {
+
+  private static String matchCond = "trip_type = 'UBERX'";
+  private static String nonMatchCond = "trip_type = 'BLACK'";
+  private static String[] dropColumns = {"_hoodie_commit_time", 
"_hoodie_commit_seqno",
+      "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
+
+  private Boolean shouldOverwrite;
+  Map<String, String> options;
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator();
+    shouldOverwrite = true;
+    options = getOptions();
+    Properties props = new Properties();
+    props.putAll(options);
+    try {
+      metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath.toString(), props);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    metaClient = null;
+  }
+
+  /**
+   * Create two files, one should be excluded by colstats
+   */
+  @Test
+  public void testBaseFileOnly() {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    filesToCorrupt.forEach(TestMORColstats::corruptFile);
+    assertEquals(0, readMatchingRecords().except(batch1).count());
+    //Read without data skipping to show that it will fail
+    //Reading with data skipping succeeded so that means that data skipping is 
working and the corrupted
+    //file was not read
+    assertThrows(SparkException.class, () -> 
readMatchingRecords(false).count());
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatches() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * Then do a compaction
+   * Now you have two base files that match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDoCompaction() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file for each filegroup that contains exactly the same 
records as the base file
+   * Then schedule an async compaction
+   * Then add a log file so that both file groups match the condition
+   * The new log file is a member of a newer file slice
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAsyncCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(true, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * both file groups must still be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * Then compact
+   * Only the first file group needs to be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * both filegroups should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true);
+  }
+
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * Do Compaction
+   * Only 1 filegroup should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBackCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, true);
+  }
+
+  /**
+   * Test where one filegroup doesn't match the condition, then update so both 
filegroups match
+   */
+  private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean 
shouldScheduleCompaction,
+                                                         Boolean 
shouldInlineCompact,
+                                                         Boolean shouldDelete,
+                                                         Boolean 
shouldRollback) {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    if (shouldScheduleCompaction) {
+      doWrite(inserts);
+      scheduleCompaction();
+    }
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    Dataset<Row> recordToUpdate = batch2.limit(1);
+    Dataset<Row> updatedRecord = makeRecordMatch(recordToUpdate);
+    doWrite(updatedRecord);
+    if (shouldRollback) {
+      deleteLatestDeltacommit();
+      enableInlineCompaction(shouldInlineCompact);
+      doWrite(recordToUpdate);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else if (shouldDelete) {
+      enableInlineCompaction(shouldInlineCompact);
+      doDelete(updatedRecord);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else {
+      assertEquals(0, 
readMatchingRecords().except(batch1.union(updatedRecord)).count());
+    }
+
+    if (shouldInlineCompact) {
+      filesToCorrupt = getFilesToCorrupt();
+      filesToCorrupt.forEach(TestMORColstats::corruptFile);
+      if (shouldDelete || shouldRollback) {
+        //filesToCorrupt includes all base files in the filegroup
+        assertEquals(2, filesToCorrupt.size());
+        assertEquals(0, readMatchingRecords().except(batch1).count());
+      } else {
+        enableInlineCompaction(true);
+        doWrite(updatedRecord);
+        assertEquals(0, filesToCorrupt.size());
+      }
+    } else {
+      //Corrupt to prove that colstats does not exclude filegroup
+      filesToCorrupt.forEach(TestMORColstats::corruptFile);
+      assertEquals(1, filesToCorrupt.size());
+      assertThrows(SparkException.class, () -> readMatchingRecords().count());

Review Comment:
   @nsivabalan we should update the config description then
   ```
     public static final ConfigProperty<String> INLINE_COMPACT = ConfigProperty
         .key("hoodie.compact.inline")
         .defaultValue("false")
         .withDocumentation("When set to true, compaction service is triggered 
after each write. While being "
             + " simpler operationally, this adds extra latency on the write 
path.");
   
     public static final ConfigProperty<String> SCHEDULE_INLINE_COMPACT = 
ConfigProperty
         .key("hoodie.compact.schedule.inline")
         .defaultValue("false")
         .markAdvanced()
         .withDocumentation("When set to true, compaction service will be 
attempted for inline scheduling after each write. Users have to ensure "
             + "they have a separate job to run async compaction(execution) for 
the one scheduled by this writer. Users can choose to set both "
             + "`hoodie.compact.inline` and `hoodie.compact.schedule.inline` to 
false and have both scheduling and execution triggered by any async process. "
             + "But if `hoodie.compact.inline` is set to false, and 
`hoodie.compact.schedule.inline` is set to true, regular writers will schedule 
compaction inline, "
             + "but users are expected to trigger async job for execution. If 
`hoodie.compact.inline` is set to true, regular writers will do both scheduling 
and "
             + "execution inline for compaction");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to