This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 63a37211384f320b3e4af00a8f2dd46dd280e9cd Author: lokesh-lingarajan-0310 <[email protected]> AuthorDate: Tue Sep 12 05:45:44 2023 -0700 [HUDI-6724] - Defaulting previous Instant time to init time to enable full read of initial commit (#9473) This will happen in new onboarding as the old code will initialize prev=start = firstcommit-time, incremental read following this will always get entries > prev, which case we will skip part of first commit in processing. --------- Co-authored-by: Lokesh Lingarajan <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../sources/helpers/IncrSourceHelper.java | 11 +- .../utilities/sources/helpers/QueryRunner.java | 6 ++ .../sources/helpers/TestIncrSourceHelper.java | 120 +++++++++++++++++++++ 3 files changed, 136 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index ceec1851ee9..8b40edcf044 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -130,11 +130,20 @@ public class IncrSourceHelper { } }); - String previousInstantTime = beginInstantTime; + // When `beginInstantTime` is present, `previousInstantTime` is set to the completed commit before `beginInstantTime` if that exists. + // If there is no completed commit before `beginInstantTime`, e.g., `beginInstantTime` is the first commit in the active timeline, + // `previousInstantTime` is set to `DEFAULT_BEGIN_TIMESTAMP`. + String previousInstantTime = DEFAULT_BEGIN_TIMESTAMP; if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) { Option<HoodieInstant> previousInstant = activeCommitTimeline.findInstantBefore(beginInstantTime); if (previousInstant.isPresent()) { previousInstantTime = previousInstant.get().getTimestamp(); + } else { + // if begin instant time matches first entry in active timeline, we can set previous = beginInstantTime - 1 + if (activeCommitTimeline.filterCompletedInstants().firstInstant().isPresent() + && activeCommitTimeline.filterCompletedInstants().firstInstant().get().getTimestamp().equals(beginInstantTime)) { + previousInstantTime = String.valueOf(Long.parseLong(beginInstantTime) - 1); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index f65930d18ff..761e942549c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -54,6 +54,12 @@ public class QueryRunner { this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH); } + /** + * This is used to execute queries for cloud stores incremental pipelines. + * Regular Hudi incremental queries does not take this flow. + * @param queryInfo all meta info about the query to be executed. + * @return the output of the query as Dataset < Row >. + */ public Dataset<Row> run(QueryInfo queryInfo) { Dataset<Row> dataset = null; if (queryInfo.isIncremental()) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java index 78020697c2e..9ce864aceae 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java @@ -18,13 +18,31 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Triple; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -35,6 +53,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +61,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; import static org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -49,10 +69,15 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { private ObjectMapper mapper = new ObjectMapper(); private JavaSparkContext jsc; + private HoodieTableMetaClient metaClient; + + private static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( + TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", true); @BeforeEach public void setUp() throws IOException { jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); } private String generateS3EventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) @@ -247,4 +272,99 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { assertEquals("commit3#path/to/file8.json", result.getKey().toString()); assertTrue(!result.getRight().isPresent()); } + + private HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { + String partitionPath = bucketName; + Schema schema = S3_METADATA_SCHEMA; + GenericRecord rec = new GenericData.Record(schema); + Schema.Field s3Field = schema.getField("s3"); + Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the record schema is the second type + // Create a generic record for the "s3" field + GenericRecord s3Record = new GenericData.Record(s3Schema); + + Schema.Field s3BucketField = s3Schema.getField("bucket"); + Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming the record schema is the second type + GenericRecord s3BucketRec = new GenericData.Record(s3Bucket); + s3BucketRec.put("name", bucketName); + + + Schema.Field s3ObjectField = s3Schema.getField("object"); + Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming the record schema is the second type + GenericRecord s3ObjectRec = new GenericData.Record(s3Object); + s3ObjectRec.put("key", objectKey); + s3ObjectRec.put("size", objectSize); + + s3Record.put("bucket", s3BucketRec); + s3Record.put("object", s3ObjectRec); + rec.put("s3", s3Record); + rec.put("_hoodie_commit_time", commitTime); + + HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); + return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), payload); + } + + private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(S3_METADATA_SCHEMA.toString()) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(metaClient.getTableConfig().getTableName()); + } + + private HoodieWriteConfig getWriteConfig() { + return getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); + } + + private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String commitTime) throws IOException { + HoodieWriteConfig writeConfig = getWriteConfig(); + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + + writeClient.startCommitWithTime(commitTime); + List<HoodieRecord> s3MetadataRecords = Arrays.asList( + generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L) + ); + JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime); + + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + return Pair.of(commitTime, s3MetadataRecords); + } + + // Tests to validate previous, begin and end instances during query generation for + // different missing checkpoint strategies + @Test + void testQueryInfoGeneration() throws IOException { + String commitTimeForReads = "1"; + String commitTimeForWrites = "2"; + + Pair<String, List<HoodieRecord>> inserts = writeS3MetadataRecords(commitTimeForReads); + inserts = writeS3MetadataRecords(commitTimeForWrites); + + String startInstant = commitTimeForReads; + String orderColumn = "_hoodie_commit_time"; + String keyColumn = "s3.object.key"; + String limitColumn = "s3.object.size"; + QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, Option.of(startInstant), null, + TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, limitColumn, true, Option.empty()); + assertEquals(String.valueOf(Integer.parseInt(commitTimeForReads) - 1), queryInfo.getPreviousInstant()); + assertEquals(commitTimeForReads, queryInfo.getStartInstant()); + assertEquals(commitTimeForWrites, queryInfo.getEndInstant()); + + startInstant = commitTimeForWrites; + queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, Option.of(startInstant), null, + TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, limitColumn, true, Option.empty()); + assertEquals(commitTimeForReads, queryInfo.getPreviousInstant()); + assertEquals(commitTimeForWrites, queryInfo.getStartInstant()); + assertEquals(commitTimeForWrites, queryInfo.getEndInstant()); + + } } \ No newline at end of file
