This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 63a37211384f320b3e4af00a8f2dd46dd280e9cd
Author: lokesh-lingarajan-0310 
<[email protected]>
AuthorDate: Tue Sep 12 05:45:44 2023 -0700

    [HUDI-6724] - Defaulting previous Instant time to init time to enable full 
read of initial commit (#9473)
    
    This will happen in new onboarding as the old code will initialize 
prev=start = firstcommit-time,
    incremental read following this will always get entries > prev,
    which case we will skip part of first commit in processing.
    
    ---------
    
    Co-authored-by: Lokesh Lingarajan 
<[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../sources/helpers/IncrSourceHelper.java          |  11 +-
 .../utilities/sources/helpers/QueryRunner.java     |   6 ++
 .../sources/helpers/TestIncrSourceHelper.java      | 120 +++++++++++++++++++++
 3 files changed, 136 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index ceec1851ee9..8b40edcf044 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -130,11 +130,20 @@ public class IncrSourceHelper {
       }
     });
 
-    String previousInstantTime = beginInstantTime;
+    // When `beginInstantTime` is present, `previousInstantTime` is set to the 
completed commit before `beginInstantTime` if that exists.
+    // If there is no completed commit before `beginInstantTime`, e.g., 
`beginInstantTime` is the first commit in the active timeline,
+    // `previousInstantTime` is set to `DEFAULT_BEGIN_TIMESTAMP`.
+    String previousInstantTime = DEFAULT_BEGIN_TIMESTAMP;
     if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
       Option<HoodieInstant> previousInstant = 
activeCommitTimeline.findInstantBefore(beginInstantTime);
       if (previousInstant.isPresent()) {
         previousInstantTime = previousInstant.get().getTimestamp();
+      } else {
+        // if begin instant time matches first entry in active timeline, we 
can set previous = beginInstantTime - 1
+        if 
(activeCommitTimeline.filterCompletedInstants().firstInstant().isPresent()
+            && 
activeCommitTimeline.filterCompletedInstants().firstInstant().get().getTimestamp().equals(beginInstantTime))
 {
+          previousInstantTime = 
String.valueOf(Long.parseLong(beginInstantTime) - 1);
+        }
       }
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index f65930d18ff..761e942549c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -54,6 +54,12 @@ public class QueryRunner {
     this.sourcePath = getStringWithAltKeys(props, 
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
   }
 
+  /**
+   * This is used to execute queries for cloud stores incremental pipelines.
+   * Regular Hudi incremental queries does not take this flow.
+   * @param queryInfo all meta info about the query to be executed.
+   * @return the output of the query as Dataset < Row >.
+   */
   public Dataset<Row> run(QueryInfo queryInfo) {
     Dataset<Row> dataset = null;
     if (queryInfo.isIncremental()) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
index 78020697c2e..9ce864aceae 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -18,13 +18,31 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -35,6 +53,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +61,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -49,10 +69,15 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
 
   private ObjectMapper mapper = new ObjectMapper();
   private JavaSparkContext jsc;
+  private HoodieTableMetaClient metaClient;
+
+  private static final Schema S3_METADATA_SCHEMA = 
SchemaTestUtil.getSchemaFromResource(
+      TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", 
true);
 
   @BeforeEach
   public void setUp() throws IOException {
     jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
   }
 
   private String generateS3EventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
@@ -247,4 +272,99 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
     assertTrue(!result.getRight().isPresent());
   }
+
+  private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {
+    String partitionPath = bucketName;
+    Schema schema = S3_METADATA_SCHEMA;
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema.Field s3Field = schema.getField("s3");
+    Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the 
record schema is the second type
+    // Create a generic record for the "s3" field
+    GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+    Schema.Field s3BucketField = s3Schema.getField("bucket");
+    Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+    s3BucketRec.put("name", bucketName);
+
+
+    Schema.Field s3ObjectField = s3Schema.getField("object");
+    Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+    s3ObjectRec.put("key", objectKey);
+    s3ObjectRec.put("size", objectSize);
+
+    s3Record.put("bucket", s3BucketRec);
+    s3Record.put("object", s3ObjectRec);
+    rec.put("s3", s3Record);
+    rec.put("_hoodie_commit_time", commitTime);
+
+    HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+    return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), 
payload);
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(String basePath, 
HoodieTableMetaClient metaClient) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(S3_METADATA_SCHEMA.toString())
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .forTable(metaClient.getTableConfig().getTableName());
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
+  }
+
+  private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String 
commitTime) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+
+    writeClient.startCommitWithTime(commitTime);
+    List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+        generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
+    );
+    JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    return Pair.of(commitTime, s3MetadataRecords);
+  }
+
+  // Tests to validate previous, begin and end instances during query 
generation for
+  // different missing checkpoint strategies
+  @Test
+  void testQueryInfoGeneration() throws IOException {
+    String commitTimeForReads = "1";
+    String commitTimeForWrites = "2";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+    String startInstant = commitTimeForReads;
+    String orderColumn = "_hoodie_commit_time";
+    String keyColumn = "s3.object.key";
+    String limitColumn = "s3.object.size";
+    QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 
5, Option.of(startInstant), null,
+        TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, 
limitColumn, true, Option.empty());
+    assertEquals(String.valueOf(Integer.parseInt(commitTimeForReads) - 1), 
queryInfo.getPreviousInstant());
+    assertEquals(commitTimeForReads, queryInfo.getStartInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getEndInstant());
+
+    startInstant = commitTimeForWrites;
+    queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, 
Option.of(startInstant), null,
+        TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, 
limitColumn, true, Option.empty());
+    assertEquals(commitTimeForReads, queryInfo.getPreviousInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getStartInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getEndInstant());
+
+  }
 }
\ No newline at end of file

Reply via email to