codope commented on code in PR #9336:
URL: https://github.com/apache/hudi/pull/9336#discussion_r1284605124


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -267,6 +272,11 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline findInstantsBefore(String instantTime);
 
+  /**
+   * Finds the instant before specified time.
+   */
+  Option<HoodieInstant> findInstantBefore(String instantTime);

Review Comment:
   how about reusing existing `findInstantsBefore`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -241,6 +241,11 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline findInstantsInRange(String startTs, String endTs);
 
+  /**
+   * Create a new Timeline with instants after or equals startTs and before or 
on endTs.
+   */
+  HoodieTimeline findInstantsInClosedRange(String startTs, String endTs);

Review Comment:
   Remove if not used. 
   Also, please add tests in TestHoodieActiveTimeline if adding new method to 
the timeline.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+public class QueryInfo {

Review Comment:
   some javadoc for this class?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -203,6 +204,12 @@ public HoodieDefaultTimeline findInstantsInRange(String 
startTs, String endTs) {
         getInstantsAsStream().filter(s -> 
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline findInstantsInClosedRange(String startTs, 
String endTs) {

Review Comment:
   let's remove this if not being used.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports 
size based batching
+ * This class will fetch comitted files from the current commit to support 
size based batching.
+ */
+public class QueryRunner {
+  private final SparkSession sparkSession;
+  private final TypedProperties props;

Review Comment:
   props not being used .. let's remove?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports 
size based batching
+ * This class will fetch comitted files from the current commit to support 
size based batching.
+ */
+public class QueryRunner {
+  private final SparkSession sparkSession;
+  private final TypedProperties props;
+  private final String sourcePath;
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
+
+  public QueryRunner(SparkSession sparkSession, TypedProperties props) {
+    this.sparkSession = sparkSession;
+    this.props = props;
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));

Review Comment:
   let's use `HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key()` instead of 
deprecated constant?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -413,6 +423,14 @@ static boolean isInRange(String timestamp, String startTs, 
String endTs) {
             && HoodieTimeline.compareTimestamps(timestamp, 
LESSER_THAN_OR_EQUALS, endTs);
   }
 
+  /**
+   * Return true if specified timestamp is in range [startTs, endTs].
+   */
+  static boolean isInClosedRange(String timestamp, String startTs, String 
endTs) {

Review Comment:
   won't be needed if `findInstantsInClosedRange` if not being used.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -40,10 +47,15 @@
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
 
 public class IncrSourceHelper {
 
-  private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+  private static final Logger LOG = 
LoggerFactory.getLogger(IncrSourceHelper.class);
+  public static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+  private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";
+

Review Comment:
   nit: remove one extra line.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+public class QueryInfo {
+  private final String queryType;
+  private final String previousInstant;
+  private final String startInstant;
+  private final String endInstant;
+  private final String orderColumn;
+  private final String keyColumn;
+  private final String limitColumn;
+  private final List<String> orderByColumns;
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryInfo.class);

Review Comment:
   let;s remove the logger



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -40,10 +47,15 @@
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
 
 public class IncrSourceHelper {
 
-  private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+  private static final Logger LOG = 
LoggerFactory.getLogger(IncrSourceHelper.class);
+  public static final String DEFAULT_BEGIN_TIMESTAMP = "000";

Review Comment:
   Any reason for keeping it to 3 digits only? What if instant parsing fails? 
Can we reuse `INIT_INSTANT_TS` in HoodieTimeline?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -106,17 +129,98 @@ public static Pair<String, Pair<String, String>> 
calculateBeginAndEndInstants(Ja
       }
     });
 
+    String previousInstantTime = beginInstantTime;
+    if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+      Option<HoodieInstant> previousInstant = 
activeCommitTimeline.findInstantBefore(beginInstantTime);
+      if (previousInstant.isPresent()) {
+        previousInstantTime = previousInstant.get().getTimestamp();
+      }
+    }
+
     if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || 
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
-      Option<HoodieInstant> nthInstant = 
Option.fromJavaOptional(activeCommitTimeline
-          .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
Pair.of(beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+      Option<HoodieInstant> nthInstant;
+      // When we are in the upgrade code path from non-sourcelimit-based 
batching to sourcelimit-based batching, we need to avoid fetching the commit
+      // that is read already. Else we will have duplicates in append-only use 
case if we use "findInstantsAfterOrEquals".
+      // As soon as we have a new format of checkpoint and a key we will move 
to the new code of fetching the current commit as well.
+      if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfterOrEquals(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      } else {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      }
+      return new 
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
previousInstantTime,
+          beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+          orderColumn, keyColumn, limitColumn);
     } else {
       // when MissingCheckpointStrategy is set to read everything until 
latest, trigger snapshot query.
       Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), 
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+      return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+          previousInstantTime, beginInstantTime, 
lastInstant.get().getTimestamp(),
+          orderColumn, keyColumn, limitColumn);
     }
   }
 
+  /**
+   * Adjust the source dataset to size based batch based on last checkpoint 
key.
+   *
+   * @param sourceData  Source dataset
+   * @param sourceLimit Max number of bytes to be read from source
+   * @param queryInfo   Query Info
+   * @return end instants along with filtered rows.
+   */
+  public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>> 
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+                                                                               
                             long sourceLimit, QueryInfo queryInfo,
+                                                                               
                             CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
+    if (sourceData.isEmpty()) {
+      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
+      return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+    }
+    // Let's persist the dataset to avoid triggering the dag repeatedly
+    sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+    // Set ordering in query to enable batching
+    Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData, 
queryInfo.getOrderByColumns());
+    Option<String> lastCheckpoint = 
Option.of(cloudObjectIncrCheckpoint.getCommit());
+    Option<String> lastCheckpointKey = 
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+    Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint -> 
lastCheckpointKey.map(key -> checkpoint + key));
+
+    // Filter until last checkpoint key
+    if (concatenatedKey.isPresent()) {
+      orderedDf = orderedDf.withColumn("commit_key",
+          functions.concat(functions.col(queryInfo.getOrderColumn()), 
functions.col(queryInfo.getKeyColumn())));
+      // Apply incremental filter
+      orderedDf = 
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
+      // We could be just at the end of the commit, so return empty
+      if (orderedDf.isEmpty()) {
+        LOG.info("Empty ordered source, returning endpoint:" + 
queryInfo.getEndInstant());
+        sourceData.unpersist();
+        return Pair.of(new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), 
orderedDf);
+      }
+    }
+
+    // Limit based on sourceLimit
+    WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()), 
col(queryInfo.getKeyColumn()));
+    // Add the 'cumulativeSize' column with running sum of 'limitColumn'
+    Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
+        sum(col(queryInfo.getLimitColumn())).over(windowSpec));
+    Dataset<Row> collectedRows = 
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+
+    Row row = null;
+    if (collectedRows.isEmpty()) {
+      // If the first element itself exceeds limits then return first element
+      LOG.info("First object exceeding source limit: " + sourceLimit + " 
bytes");
+      row = aggregatedData.select(queryInfo.getOrderColumn(), 
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first();
+      collectedRows = aggregatedData.limit(1);
+    } else {
+      // Get the last row and form composite key
+      row = collectedRows.select(queryInfo.getOrderColumn(), 
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy(
+          col(queryInfo.getOrderColumn()).desc(), 
col(queryInfo.getKeyColumn()).desc()).first();
+    }
+    LOG.info("Processed batch size: " + row.getLong(2) + " bytes");
+    sourceData.unpersist();
+    return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), 
row.getString(1)), collectedRows);
+  }
+
   /**
    * Validate instant time seen in the incoming row.

Review Comment:
   the method `validateInstantTime` is not being used. let's remove.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
+  private static final Schema S3_METADATA_SCHEMA = 
SchemaTestUtil.getSchemaFromResource(
+      TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", 
true);
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  private static final String MY_BUCKET = "some-bucket";
+
+  @Mock
+  private SchemaProvider mockSchemaProvider;
+  @Mock
+  QueryRunner mockQueryRunner;
+  @Mock
+  CloudDataFetcher mockCloudDataFetcher;
+  private JavaSparkContext jsc;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+  }
+
+  private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, 
String>> filePathSizeAndCommitTime) {
+    return filePathSizeAndCommitTime.stream().map(f -> {
+      try {
+        return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(), 
f.getRight());
+      } catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private Dataset<Row> generateDataset(List<Triple<String, Long, String>> 
filePathSizeAndCommitTime) {
+    JavaRDD<String> testRdd = 
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+    Dataset<Row> inputDs = spark().read().json(testRdd);
+    return inputDs;
+  }
+
+  /**
+   * Generates simple Json structure like below
+   * <p>
+   * s3 : {
+   * object : {
+   * size:
+   * key:
+   * }
+   * bucket: {
+   * name:
+   * }
+   */
+  private String generateS3EventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
+      throws JsonProcessingException {
+    Map<String, Object> objectMetadata = new HashMap<>();
+    objectMetadata.put("size", objectSize);
+    objectMetadata.put("key", objectKey);
+    Map<String, String> bucketMetadata = new HashMap<>();
+    bucketMetadata.put("name", bucketName);
+    Map<String, Object> s3Metadata = new HashMap<>();
+    s3Metadata.put("object", objectMetadata);
+    s3Metadata.put("bucket", bucketMetadata);
+    Map<String, Object> eventMetadata = new HashMap<>();
+    eventMetadata.put("s3", s3Metadata);
+    eventMetadata.put("_hoodie_commit_time", commitTime);
+    return mapper.writeValueAsString(eventMetadata);
+  }
+
+  private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {
+    String partitionPath = bucketName;
+    Schema schema = S3_METADATA_SCHEMA;
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema.Field s3Field = schema.getField("s3");
+    Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the 
record schema is the second type
+    // Create a generic record for the "s3" field
+    GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+    Schema.Field s3BucketField = s3Schema.getField("bucket");
+    Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+    s3BucketRec.put("name", bucketName);
+
+
+    Schema.Field s3ObjectField = s3Schema.getField("object");
+    Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+    s3ObjectRec.put("key", objectKey);
+    s3ObjectRec.put("size", objectSize);
+
+    s3Record.put("bucket", s3BucketRec);
+    s3Record.put("object", s3ObjectRec);
+    rec.put("s3", s3Record);
+    rec.put("_hoodie_commit_time", commitTime);
+
+    HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+    return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), 
payload);
+  }
+
+  private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy) {
+    Properties properties = new Properties();
+    properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
+    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+        missingCheckpointStrategy.name());
+    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");

Review Comment:
   should we parameterize this and test other file formats? if so, we can take 
it up even in a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to