This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cc96e8  [HUDI-1897] Deltastreamer source for AWS S3 (#3433)
5cc96e8 is described below

commit 5cc96e85c13e56e866ca15063bf08c204d967377
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Aug 14 17:55:10 2021 +0530

    [HUDI-1897] Deltastreamer source for AWS S3 (#3433)
    
    - Added two sources for two stage pipeline. a. S3EventsSource that fetches 
events from SQS and ingests to a meta hoodie table. b. S3EventsHoodieIncrSource 
reads S3 events from this meta hoodie table, fetches actual objects from S3 and 
ingests to sink hoodie table.
    - Added selectors to assist in S3EventsSource.
    
    Co-authored-by: Satish M 
<[email protected]>
    Co-authored-by: Vinoth Chandar <[email protected]>
---
 hudi-utilities/pom.xml                             |   8 +
 .../hudi/utilities/sources/HoodieIncrSource.java   |  32 ++-
 .../sources/S3EventsHoodieIncrSource.java          | 137 ++++++++++
 .../hudi/utilities/sources/S3EventsSource.java     |  87 +++++++
 .../sources/helpers/CloudObjectsSelector.java      | 280 +++++++++++++++++++++
 .../sources/helpers/S3EventsMetaSelector.java      | 161 ++++++++++++
 .../hudi/utilities/sources/TestS3EventsSource.java | 112 +++++++++
 .../sources/helpers/TestCloudObjectsSelector.java  | 226 +++++++++++++++++
 .../sources/helpers/TestS3EventsMetaSelector.java  | 105 ++++++++
 .../utilities/testutils/CloudObjectTestUtils.java  |  98 ++++++++
 .../AbstractCloudObjectsSourceTestBase.java        | 114 +++++++++
 pom.xml                                            |   1 +
 12 files changed, 1348 insertions(+), 13 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index c8a58d5..5ef0465 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -402,6 +402,14 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- AWS Services -->
+    <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sqs</artifactId>
+      <version>${aws.sdk.version}</version>
+    </dependency>
+    
     <!-- Hive - Test -->
     <dependency>
       <groupId>${hive.groupid}</groupId>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index dd841f4..a217e6b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -42,45 +42,51 @@ public class HoodieIncrSource extends RowSource {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieIncrSource.class);
 
-  protected static class Config {
+  static class Config {
 
     /**
      * {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie 
table.
      */
-    private static final String HOODIE_SRC_BASE_PATH = 
"hoodie.deltastreamer.source.hoodieincr.path";
+    static final String HOODIE_SRC_BASE_PATH = 
"hoodie.deltastreamer.source.hoodieincr.path";
 
     /**
      * {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants 
whose changes can be incrementally fetched.
      */
-    private static final String NUM_INSTANTS_PER_FETCH = 
"hoodie.deltastreamer.source.hoodieincr.num_instants";
-    private static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
+    static final String NUM_INSTANTS_PER_FETCH = 
"hoodie.deltastreamer.source.hoodieincr.num_instants";
+    static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
 
     /**
      * {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that 
needs to be added to source table after
      * parsing _hoodie_partition_path.
      */
-    private static final String HOODIE_SRC_PARTITION_FIELDS = 
"hoodie.deltastreamer.source.hoodieincr.partition.fields";
+    static final String HOODIE_SRC_PARTITION_FIELDS = 
"hoodie.deltastreamer.source.hoodieincr.partition.fields";
 
     /**
      * {@value #HOODIE_SRC_PARTITION_EXTRACTORCLASS} PartitionValueExtractor 
class to extract partition fields from
      * _hoodie_partition_path.
      */
-    private static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
+    static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
         "hoodie.deltastreamer.source.hoodieincr.partition.extractor.class";
-    private static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
+    static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
         SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
 
     /**
      * {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to 
incrementally fetch from latest committed
      * instant when checkpoint is not provided.
      */
-    private static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
+    static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
         "hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt";
-    private static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = 
false;
+    static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;
+
+    /**
+     * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading 
dataset. Default value is parquet.
+     */
+    static final String SOURCE_FILE_FORMAT = 
"hoodie.deltastreamer.source.hoodieincr.file.format";
+    static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
   }
 
   public HoodieIncrSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+                          SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
   }
 
@@ -123,10 +129,10 @@ public class HoodieIncrSource extends RowSource {
 
     /*
      * log.info("Partition Fields are : (" + partitionFields + "). Initial 
Source Schema :" + source.schema());
-     * 
+     *
      * StructType newSchema = new StructType(source.schema().fields()); for 
(String field : partitionFields) { newSchema
      * = newSchema.add(field, DataTypes.StringType, true); }
-     * 
+     *
      * /** Validates if the commit time is sane and also generates Partition 
fields from _hoodie_partition_path if
      * configured
      *
@@ -139,7 +145,7 @@ public class HoodieIncrSource extends RowSource {
      * "#partition-fields != #partition-values-extracted"); List<Object> 
rowObjs = new
      * 
ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); 
rowObjs.addAll(partitionVals); return
      * RowFactory.create(rowObjs.toArray()); } return row; }, 
RowEncoder.apply(newSchema));
-     * 
+     *
      * log.info("Validated Source Schema :" + validated.schema());
      */
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
new file mode 100644
index 0000000..79e4abb
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
+
+/**
+ * This source will use the S3 events meta information from hoodie table 
generate by {@link S3EventsSource}.
+ */
+public class S3EventsHoodieIncrSource extends HoodieIncrSource {
+
+  private static final Logger LOG = 
LogManager.getLogger(S3EventsHoodieIncrSource.class);
+
+  static class Config {
+    // control whether we do existence check for files before consuming them
+    static final String ENABLE_EXISTS_CHECK = 
"hoodie.deltastreamer.source.s3incr.check.file.exists";
+    static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
+  }
+
+  public S3EventsHoodieIncrSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    String srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, 
DEFAULT_NUM_INSTANTS_PER_FETCH);
+    boolean readLatestOnMissingCkpt = props.getBoolean(
+        READ_LATEST_INSTANT_ON_MISSING_CKPT, 
DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+    // Use begin Instant if set and non-empty
+    Option<String> beginInstant =
+        lastCkptStr.isPresent()
+            ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
+            : Option.empty();
+
+    Pair<String, String> instantEndpts =
+        IncrSourceHelper.calculateBeginAndEndInstants(
+            sparkContext, srcPath, numInstantsPerFetch, beginInstant, 
readLatestOnMissingCkpt);
+
+    if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
+      LOG.warn("Already caught up. Begin Checkpoint was :" + 
instantEndpts.getKey());
+      return Pair.of(Option.empty(), instantEndpts.getKey());
+    }
+
+    // Do incremental pull. Set end instant if available.
+    DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+        .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), 
instantEndpts.getLeft())
+        .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
instantEndpts.getRight());
+    Dataset<Row> source = metaReader.load(srcPath);
+    // Extract distinct file keys from s3 meta hoodie table
+    final List<Row> cloudMetaDf = source
+        .filter("s3.object.size > 0")
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .collectAsList();
+    // Create S3 paths
+    final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    List<String> cloudFiles = new ArrayList<>();
+    for (Row row : cloudMetaDf) {
+      // construct file path, row index 0 refers to bucket and 1 refers to key
+      String bucket = row.getString(0);
+      String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
+      if (checkExists) {
+        FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, 
sparkSession.sparkContext().hadoopConfiguration());
+        try {
+          if (fs.exists(new Path(filePath))) {
+            cloudFiles.add(filePath);
+          }
+        } catch (IOException e) {
+          LOG.error(String.format("Error while checking path exists for %s ", 
filePath), e);
+        }
+      } else {
+        cloudFiles.add(filePath);
+      }
+    }
+    String fileFormat = props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT);
+    Option<Dataset<Row>> dataset = Option.empty();
+    if (!cloudFiles.isEmpty()) {
+      dataset = 
Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new 
String[0])));
+    }
+    return Pair.of(dataset, instantEndpts.getRight());
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
new file mode 100644
index 0000000..717c414
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hudi table for S3 events 
metadata (eg. S3
+ * put events data). It will use the SQS for receiving the object key events. 
This can be useful
+ * to check S3 files activity over time. The hudi table created by this source 
is consumed by
+ * {@link S3EventsHoodieIncrSource} to apply changes to the hudi table 
corresponding to user data.
+ */
+public class S3EventsSource extends RowSource {
+
+  private final S3EventsMetaSelector pathSelector;
+  private final List<Message> processedMessages = new ArrayList<>();
+  AmazonSQS sqs;
+
+  public S3EventsSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
+    this.sqs = this.pathSelector.createAmazonSqsClient();
+  }
+
+  /**
+   * Fetches next events from the queue.
+   *
+   * @param lastCkptStr The last checkpoint instant string, empty if first run.
+   * @param sourceLimit Limit on the size of data to fetch. For {@link 
S3EventsSource},
+   *                    {@link 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config#S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH}
 is used.
+   * @return A pair of dataset of event records and the next checkpoint 
instant string
+   */
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    Pair<List<String>, String> selectPathsWithLatestSqsMessage =
+        pathSelector.getNextEventsFromQueue(sqs, lastCkptStr, 
processedMessages);
+    if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) {
+      return Pair.of(Option.empty(), 
selectPathsWithLatestSqsMessage.getRight());
+    } else {
+      Dataset<String> eventRecords = 
sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), 
Encoders.STRING());
+      return Pair.of(
+          Option.of(sparkSession.read().json(eventRecords)),
+          selectPathsWithLatestSqsMessage.getRight());
+    }
+  }
+
+  @Override
+  public void onCommit(String lastCkptStr) {
+    pathSelector.deleteProcessedMessages(sqs, pathSelector.queueUrl, 
processedMessages);
+    processedMessages.clear();
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
new file mode 100644
index 0000000..7252494
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class has methods for processing cloud objects.
+ * It currently supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+  public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+      Collections.singletonList("ObjectCreated");
+  public static final String S3_PREFIX = "s3://";
+  public static volatile Logger log = 
LogManager.getLogger(CloudObjectsSelector.class);
+  public static final String SQS_ATTR_APPROX_MESSAGES = 
"ApproximateNumberOfMessages";
+  static final String SQS_MODEL_MESSAGE = "Message";
+  static final String SQS_MODEL_EVENT_RECORDS = "Records";
+  static final String SQS_MODEL_EVENT_NAME = "eventName";
+  static final String S3_MODEL_EVENT_TIME = "eventTime";
+  static final String S3_FILE_SIZE = "fileSize";
+  static final String S3_FILE_PATH = "filePath";
+  public final String queueUrl;
+  public final int longPollWait;
+  public final int maxMessagesPerRequest;
+  public final int maxMessagePerBatch;
+  public final int visibilityTimeout;
+  public final TypedProperties props;
+  public final String fsName;
+  private final String regionName;
+
+  /**
+   * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+   */
+  public CloudObjectsSelector(TypedProperties props) {
+    DataSourceUtils.checkRequiredProperties(props, 
Arrays.asList(Config.S3_SOURCE_QUEUE_URL, Config.S3_SOURCE_QUEUE_REGION));
+    this.props = props;
+    this.queueUrl = props.getString(Config.S3_SOURCE_QUEUE_URL);
+    this.regionName = props.getString(Config.S3_SOURCE_QUEUE_REGION);
+    this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, 
"s3").toLowerCase();
+    this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20);
+    this.maxMessagePerBatch = 
props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5);
+    this.visibilityTimeout = 
props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30);
+    this.maxMessagesPerRequest = 10;
+  }
+
+  /**
+   * Get SQS queue attributes.
+   *
+   * @param sqsClient AWSClient for sqsClient
+   * @param queueUrl  queue full url
+   * @return map of attributes needed
+   */
+  protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, 
String queueUrl) {
+    GetQueueAttributesResult queueAttributesResult = 
sqsClient.getQueueAttributes(
+        new 
GetQueueAttributesRequest(queueUrl).withAttributeNames(SQS_ATTR_APPROX_MESSAGES)
+    );
+    return queueAttributesResult.getAttributes();
+  }
+
+  /**
+   * Get the file attributes filePath, eventTime and size from JSONObject 
record.
+   *
+   * @param record of object event
+   * @return map of file attribute
+   */
+  protected Map<String, Object> getFileAttributesFromRecord(JSONObject record) 
throws UnsupportedEncodingException {
+    Map<String, Object> fileRecord = new HashMap<>();
+    String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME);
+    long eventTime =
+        
Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+    JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+    String bucket = 
URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"),
 "UTF-8");
+    String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+    String filePath = this.fsName + "://" + bucket + "/" + key;
+    fileRecord.put(S3_MODEL_EVENT_TIME, eventTime);
+    fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size"));
+    fileRecord.put(S3_FILE_PATH, filePath);
+    return fileRecord;
+  }
+
+  /**
+   * Amazon SQS Client Builder.
+   */
+  public AmazonSQS createAmazonSqsClient() {
+    return 
AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+  }
+
+  /**
+   * List messages from queue.
+   */
+  protected List<Message> getMessagesToProcess(
+      AmazonSQS sqsClient,
+      String queueUrl,
+      int longPollWait,
+      int visibilityTimeout,
+      int maxMessagePerBatch,
+      int maxMessagesPerRequest) {
+    List<Message> messagesToProcess = new ArrayList<>();
+    ReceiveMessageRequest receiveMessageRequest =
+        new ReceiveMessageRequest()
+            .withQueueUrl(queueUrl)
+            .withWaitTimeSeconds(longPollWait)
+            .withVisibilityTimeout(visibilityTimeout);
+    receiveMessageRequest.setMaxNumberOfMessages(maxMessagesPerRequest);
+    // Get count for available messages
+    Map<String, String> queueAttributesResult = 
getSqsQueueAttributes(sqsClient, queueUrl);
+    long approxMessagesAvailable = 
Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES));
+    log.info("Approximately " + approxMessagesAvailable + " messages available 
in queue.");
+    long numMessagesToProcess = Math.min(approxMessagesAvailable, 
maxMessagePerBatch);
+    for (int i = 0;
+         i < (int) Math.ceil((double) numMessagesToProcess / 
maxMessagesPerRequest);
+         ++i) {
+      List<Message> messages = 
sqsClient.receiveMessage(receiveMessageRequest).getMessages();
+      log.debug("Number of messages: " + messages.size());
+      messagesToProcess.addAll(messages);
+      if (messages.isEmpty()) {
+        // ApproximateNumberOfMessages value is eventually consistent.
+        // So, we still need to check and break if there are no messages.
+        break;
+      }
+    }
+    return messagesToProcess;
+  }
+
+  /**
+   * Create partitions of list using specific batch size. we can't use third 
party API for this
+   * functionality, due to 
https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270
+   */
+  protected List<List<Message>> createListPartitions(List<Message> singleList, 
int eachBatchSize) {
+    List<List<Message>> listPartitions = new ArrayList<>();
+    if (singleList.size() == 0 || eachBatchSize < 1) {
+      return listPartitions;
+    }
+    for (int start = 0; start < singleList.size(); start += eachBatchSize) {
+      int end = Math.min(start + eachBatchSize, singleList.size());
+      if (start > end) {
+        throw new IndexOutOfBoundsException(
+            "Index " + start + " is out of the list range <0," + 
(singleList.size() - 1) + ">");
+      }
+      listPartitions.add(new ArrayList<>(singleList.subList(start, end)));
+    }
+    return listPartitions;
+  }
+
+  /**
+   * Delete batch of messages from queue.
+   */
+  protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, 
List<Message> messagesToBeDeleted) {
+    DeleteMessageBatchRequest deleteBatchReq =
+        new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
+    List<DeleteMessageBatchRequestEntry> deleteEntries = 
deleteBatchReq.getEntries();
+    for (Message message : messagesToBeDeleted) {
+      deleteEntries.add(
+          new DeleteMessageBatchRequestEntry()
+              .withId(message.getMessageId())
+              .withReceiptHandle(message.getReceiptHandle()));
+    }
+    DeleteMessageBatchResult deleteResult = 
sqs.deleteMessageBatch(deleteBatchReq);
+    List<String> deleteFailures =
+        deleteResult.getFailed().stream()
+            .map(BatchResultErrorEntry::getId)
+            .collect(Collectors.toList());
+    if (!deleteFailures.isEmpty()) {
+      log.warn(
+          "Failed to delete "
+              + deleteFailures.size()
+              + " messages out of "
+              + deleteEntries.size()
+              + " from queue.");
+    } else {
+      log.info("Successfully deleted " + deleteEntries.size() + " messages 
from queue.");
+    }
+  }
+
+  /**
+   * Delete Queue Messages after hudi commit. This method will be invoked by 
source.onCommit.
+   */
+  public void deleteProcessedMessages(AmazonSQS sqs, String queueUrl, 
List<Message> processedMessages) {
+    if (!processedMessages.isEmpty()) {
+      // create batch for deletion, SES DeleteMessageBatchRequest only accept 
max 10 entries
+      List<List<Message>> deleteBatches = 
createListPartitions(processedMessages, 10);
+      for (List<Message> deleteBatch : deleteBatches) {
+        deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
+      }
+    }
+  }
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private static final String HOODIE_DELTASTREAMER_S3_SOURCE = 
"hoodie.deltastreamer.s3.source";
+    /**
+     * {@value #S3_SOURCE_QUEUE_URL} is the queue url for cloud object events.
+     */
+    public static final String S3_SOURCE_QUEUE_URL = 
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.url";
+
+    /**
+     * {@value #S3_SOURCE_QUEUE_REGION} is the case-sensitive region name of 
the cloud provider for the queue. For example, "us-east-1".
+     */
+    public static final String S3_SOURCE_QUEUE_REGION = 
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.region";
+
+    /**
+     * {@value #S3_SOURCE_QUEUE_FS} is file system corresponding to queue. For 
example, for AWS SQS it is s3/s3a.
+     */
+    public static final String S3_SOURCE_QUEUE_FS = 
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.fs";
+
+    /**
+     * {@value #S3_QUEUE_LONG_POLL_WAIT} is the long poll wait time in seconds 
If set as 0 then
+     * client will fetch on short poll basis.
+     */
+    public static final String S3_QUEUE_LONG_POLL_WAIT =
+        HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.long.poll.wait";
+
+    /**
+     * {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is max messages for 
each batch of delta streamer
+     * run. Source will process these maximum number of message at a time.
+     */
+    public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH =
+        HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch";
+
+    /**
+     * {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for 
messages in queue. After we
+     * consume the message, queue will move the consumed messages to in-flight 
state, these messages
+     * can't be consumed again by source for this timeout period.
+     */
+    public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT =
+        HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.visibility.timeout";
+
+    /**
+     * {@value #SOURCE_INPUT_SELECTOR} source input selector.
+     */
+    public static final String SOURCE_INPUT_SELECTOR = 
"hoodie.deltastreamer.source.input.selector";
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
new file mode 100644
index 0000000..68ac7ab
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * S3 events metadata selector class. This class provides methods to process 
the
+ * messages from SQS for {@link 
org.apache.hudi.utilities.sources.S3EventsSource}.
+ */
+public class S3EventsMetaSelector extends CloudObjectsSelector {
+
+  private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements";
+
+  /**
+   * Cloud Objects Meta Selector Class. {@link CloudObjectsSelector}
+   */
+  public S3EventsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default 
selector to use is {@link
+   * S3EventsMetaSelector}
+   */
+  public static S3EventsMetaSelector createSourceSelector(TypedProperties 
props) {
+    String sourceSelectorClass =
+        props.getString(
+            S3EventsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+            S3EventsMetaSelector.class.getName());
+    try {
+      S3EventsMetaSelector selector =
+          (S3EventsMetaSelector)
+              ReflectionUtils.loadClass(
+                  sourceSelectorClass, new Class<?>[] {TypedProperties.class}, 
props);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + 
sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * List messages from queue, filter out illegible events while doing so. It 
will also delete the
+   * ineligible messages from queue.
+   *
+   * @param processedMessages array of processed messages to add more messages
+   * @return the filtered list of valid S3 events in SQS.
+   */
+  protected List<Map<String, Object>> getValidEvents(AmazonSQS sqs, 
List<Message> processedMessages) throws IOException {
+    List<Message> messages =
+        getMessagesToProcess(
+            sqs,
+            this.queueUrl,
+            this.longPollWait,
+            this.visibilityTimeout,
+            this.maxMessagePerBatch,
+            this.maxMessagesPerRequest);
+    return processAndDeleteInvalidMessages(processedMessages, messages);
+  }
+
+  private List<Map<String, Object>> 
processAndDeleteInvalidMessages(List<Message> processedMessages,
+                                                                    
List<Message> messages) throws IOException {
+    List<Map<String, Object>> validEvents = new ArrayList<>();
+    for (Message message : messages) {
+      JSONObject messageBody = new JSONObject(message.getBody());
+      Map<String, Object> messageMap;
+      ObjectMapper mapper = new ObjectMapper();
+      if (messageBody.has(SQS_MODEL_MESSAGE)) {
+        // If this messages is from S3Event -> SNS -> SQS
+        messageMap = (Map<String, Object>) 
mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
+      } else {
+        // If this messages is from S3Event -> SQS
+        messageMap = (Map<String, Object>) 
mapper.readValue(messageBody.toString(), Map.class);
+      }
+      if (messageMap.containsKey(SQS_MODEL_EVENT_RECORDS)) {
+        List<Map<String, Object>> events = (List<Map<String, Object>>) 
messageMap.get(SQS_MODEL_EVENT_RECORDS);
+        for (Map<String, Object> event : events) {
+          event.remove(S3_EVENT_RESPONSE_ELEMENTS);
+          String eventName = (String) event.get(SQS_MODEL_EVENT_NAME);
+          // filter only allowed s3 event types
+          if 
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+            validEvents.add(event);
+          } else {
+            log.debug(String.format("This S3 event %s is not allowed, so 
ignoring it.", eventName));
+          }
+        }
+      } else {
+        log.debug(String.format("Message is not expected format or it's 
s3:TestEvent. Message: %s", message));
+      }
+      processedMessages.add(message);
+    }
+    return validEvents;
+  }
+
+  /**
+   * Get the list of events from queue.
+   *
+   * @param lastCheckpointStr The last checkpoint instant string, empty if 
first run.
+   * @return A pair of dataset of event records and the next checkpoint 
instant string.
+   */
+  public Pair<List<String>, String> getNextEventsFromQueue(AmazonSQS sqs,
+                                                           Option<String> 
lastCheckpointStr,
+                                                           List<Message> 
processedMessages) {
+    processedMessages.clear();
+    log.info("Reading messages....");
+    try {
+      log.info("Start Checkpoint : " + lastCheckpointStr);
+      List<Map<String, Object>> eventRecords = getValidEvents(sqs, 
processedMessages);
+      log.info("Number of valid events: " + eventRecords.size());
+      List<String> filteredEventRecords = new ArrayList<>();
+      long newCheckpointTime = eventRecords.stream()
+          .mapToLong(eventRecord -> Date.from(Instant.from(
+                  DateTimeFormatter.ISO_INSTANT.parse((String) 
eventRecord.get(S3_MODEL_EVENT_TIME))))
+              
.getTime()).max().orElse(lastCheckpointStr.map(Long::parseLong).orElse(0L));
+
+      for (Map<String, Object> eventRecord : eventRecords) {
+        filteredEventRecords.add(new 
ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
+      }
+      return new ImmutablePair<>(filteredEventRecords, 
String.valueOf(newCheckpointTime));
+    } catch (JSONException | IOException e) {
+      throw new HoodieException("Unable to read from SQS: ", e);
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
new file mode 100644
index 0000000..3d89dc2
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import 
org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_FS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Basic tests for {@link S3EventsSource}.
+ */
+public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase {
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    this.dfsRoot = dfsBasePath + "/parquetFiles";
+    this.fileSuffix = ".parquet";
+    dfs.mkdirs(new Path(dfsRoot));
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadingFromSource() throws IOException {
+
+    SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareCloudObjectSource());
+
+    // 1. Extract without any checkpoint => (no data available)
+    generateMessageInQueue(null);
+    assertEquals(
+        Option.empty(),
+        sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
+
+    // 2. Extract without any checkpoint =>  (adding new file)
+    generateMessageInQueue("1");
+
+    // Test fetching Avro format
+    InputBatch<JavaRDD<GenericRecord>> fetch1 =
+        sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE);
+    assertEquals(1, fetch1.getBatch().get().count());
+
+    // 3. Produce new data, extract new data
+    generateMessageInQueue("2");
+    // Test fetching Avro format
+    InputBatch<JavaRDD<GenericRecord>> fetch2 =
+        sourceFormatAdapter.fetchNewDataInAvroFormat(
+            Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
+    assertEquals(1, fetch2.getBatch().get().count());
+
+    GenericRecord s3 = (GenericRecord) 
fetch2.getBatch().get().rdd().first().get("s3");
+    GenericRecord s3Object = (GenericRecord) s3.get("object");
+    assertEquals("2.parquet", s3Object.get("key").toString());
+  }
+
+  @Override
+  public Source prepareCloudObjectSource() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+    props.setProperty(S3_SOURCE_QUEUE_REGION, regionName);
+    props.setProperty(S3_SOURCE_QUEUE_FS, "hdfs");
+    S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, 
null);
+    dfsSource.sqs = this.sqs;
+    return dfsSource;
+  }
+
+  @Override
+  public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws 
IOException {
+    Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
new file mode 100644
index 0000000..02eaccf
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONObject;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_PATH;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_SIZE;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_MODEL_EVENT_TIME;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_EVENT_RECORDS;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_MESSAGE;
+import static 
org.apache.hudi.utilities.testutils.CloudObjectTestUtils.deleteMessagesInQueue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCloudObjectsSelector extends HoodieClientTestHarness {
+
+  static final String REGION_NAME = "us-east-1";
+
+  TypedProperties props;
+  String sqsUrl;
+
+  @Mock
+  AmazonSQS sqs;
+
+  @Mock
+  private CloudObjectsSelector cloudObjectsSelector;
+
+  @BeforeEach
+  void setUp() {
+    initSparkContexts();
+    initPath();
+    initFileSystem();
+    MockitoAnnotations.initMocks(this);
+
+    props = new TypedProperties();
+    sqsUrl = "test-queue";
+    props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+    props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    Mockito.reset(cloudObjectsSelector);
+    cleanupResources();
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {CloudObjectsSelector.class})
+  public void testSqsQueueAttributesShouldReturnsRequiredAttribute(Class<?> 
clazz) {
+    CloudObjectsSelector selector =
+        (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), 
props);
+
+    // setup the queue attributes
+    CloudObjectTestUtils.setMessagesInQueue(sqs, null);
+
+    // test the return values
+    Map<String, String> queueAttributes = selector.getSqsQueueAttributes(sqs, 
sqsUrl);
+    assertEquals(1, queueAttributes.size());
+    // ApproximateNumberOfMessages is a required queue attribute for Cloud 
object selector
+    assertEquals("0", queueAttributes.get(SQS_ATTR_APPROX_MESSAGES));
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {CloudObjectsSelector.class})
+  public void testFileAttributesFromRecordShouldReturnsExpectOutput(Class<?> 
clazz)
+      throws IOException {
+
+    CloudObjectsSelector selector =
+        (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), 
props);
+
+    // setup s3 record
+    String bucket = "test-bucket";
+    String key = 
"test/year=test1/month=test2/day=test3/part-foo-bar.snappy.parquet";
+
+    String s3Records =
+        "{\n  \"Type\" : \"Notification\",\n  \"MessageId\" : \"1\",\n  
\"TopicArn\" : \"arn:aws:sns:foo:123:"
+            + "foo-bar\",\n  \"Subject\" : \"Amazon S3 Notification\",\n  
\"Message\" : \"{\\\"Records\\\":"
+            + 
"[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+            + 
"-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+            + 
":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+            + 
"{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+            + 
"test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+            + 
"configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+            + bucket
+            + 
"\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+            + ",\\\"object\\\":{\\\"key\\\":\\\""
+            + key
+            + 
"\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
+    JSONObject messageBody = new JSONObject(s3Records);
+    Map<String, Object> messageMap = new HashMap<>();
+    if (messageBody.has(SQS_MODEL_MESSAGE)) {
+      ObjectMapper mapper = new ObjectMapper();
+      messageMap =
+          (Map<String, Object>) 
mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
+    }
+    List<Map<String, Object>> records = (List<Map<String, Object>>) 
messageMap.get(SQS_MODEL_EVENT_RECORDS);
+
+    // test the return values
+    Map<String, Object> fileAttributes =
+        selector.getFileAttributesFromRecord(new JSONObject(records.get(0)));
+
+    assertEquals(3, fileAttributes.size());
+    assertEquals(123L, (long) fileAttributes.get(S3_FILE_SIZE));
+    assertEquals(S3_PREFIX + bucket + "/" + key, 
fileAttributes.get(S3_FILE_PATH));
+    assertEquals(1627376736755L, (long) 
fileAttributes.get(S3_MODEL_EVENT_TIME));
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {CloudObjectsSelector.class})
+  public void testCreateListPartitionsReturnsExpectedSetOfBatch(Class<?> 
clazz) {
+
+    CloudObjectsSelector selector =
+        (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), 
props);
+
+    // setup lists
+    List<Message> testSingleList = new ArrayList<>();
+    testSingleList.add(new Message().addAttributesEntry("id", "1"));
+    testSingleList.add(new Message().addAttributesEntry("id", "2"));
+    testSingleList.add(new Message().addAttributesEntry("id", "3"));
+    testSingleList.add(new Message().addAttributesEntry("id", "4"));
+    testSingleList.add(new Message().addAttributesEntry("id", "5"));
+
+    List<Message> expectedFirstList = new ArrayList<>();
+    expectedFirstList.add(new Message().addAttributesEntry("id", "1"));
+    expectedFirstList.add(new Message().addAttributesEntry("id", "2"));
+
+    List<Message> expectedSecondList = new ArrayList<>();
+    expectedSecondList.add(new Message().addAttributesEntry("id", "3"));
+    expectedSecondList.add(new Message().addAttributesEntry("id", "4"));
+
+    List<Message> expectedFinalList = new ArrayList<>();
+    expectedFinalList.add(new Message().addAttributesEntry("id", "5"));
+
+    //  test the return values
+    List<List<Message>> partitionedList = 
selector.createListPartitions(testSingleList, 2);
+
+    assertEquals(3, partitionedList.size());
+    assertEquals(expectedFirstList, partitionedList.get(0));
+    assertEquals(expectedSecondList, partitionedList.get(1));
+    assertEquals(expectedFinalList, partitionedList.get(2));
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {CloudObjectsSelector.class})
+  public void testCreateListPartitionsReturnsEmptyIfBatchSizeIsZero(Class<?> 
clazz) {
+
+    CloudObjectsSelector selector =
+        (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), 
props);
+
+    // setup lists
+    List<Message> testSingleList = new ArrayList<>();
+    testSingleList.add(new Message().addAttributesEntry("id", "1"));
+    testSingleList.add(new Message().addAttributesEntry("id", "2"));
+
+    //  test the return values
+    List<List<Message>> partitionedList = 
selector.createListPartitions(testSingleList, 0);
+
+    assertEquals(0, partitionedList.size());
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {CloudObjectsSelector.class})
+  public void testOnCommitDeleteProcessedMessages(Class<?> clazz) {
+
+    CloudObjectsSelector selector =
+        (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), 
props);
+
+    // setup lists
+    List<Message> testSingleList = new ArrayList<>();
+    testSingleList.add(
+        new Message()
+            .addAttributesEntry("MessageId", "1")
+            .addAttributesEntry("ReceiptHandle", "1"));
+    testSingleList.add(
+        new Message()
+            .addAttributesEntry("MessageId", "2")
+            .addAttributesEntry("ReceiptHandle", "1"));
+
+    deleteMessagesInQueue(sqs);
+
+    //  test the return values
+    selector.deleteProcessedMessages(sqs, sqsUrl, testSingleList);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
new file mode 100644
index 0000000..2208543
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.hadoop.fs.Path;
+import org.json.JSONObject;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static 
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestS3EventsMetaSelector extends HoodieClientTestHarness {
+
+  TypedProperties props;
+  String sqsUrl;
+
+  @Mock
+  AmazonSQS sqs;
+
+  @Mock
+  private S3EventsMetaSelector s3EventsMetaSelector;
+
+  @BeforeEach
+  void setUp() {
+    initSparkContexts();
+    initPath();
+    initFileSystem();
+    MockitoAnnotations.initMocks(this);
+
+    props = new TypedProperties();
+    sqsUrl = "test-queue";
+    props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+    props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    Mockito.reset(s3EventsMetaSelector);
+    cleanupResources();
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {S3EventsMetaSelector.class})
+  public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class<?> 
clazz) {
+    S3EventsMetaSelector selector = (S3EventsMetaSelector) 
ReflectionUtils.loadClass(clazz.getName(), props);
+    // setup s3 record
+    String bucket = "test-bucket";
+    String key = "part-foo-bar.snappy.parquet";
+    Path path = new Path(bucket, key);
+    CloudObjectTestUtils.setMessagesInQueue(sqs, path);
+
+    List<Message> processed = new ArrayList<>();
+
+    // test the return values
+    Pair<List<String>, String> eventFromQueue =
+        selector.getNextEventsFromQueue(sqs, Option.empty(), processed);
+
+    assertEquals(1, eventFromQueue.getLeft().size());
+    assertEquals(1, processed.size());
+    assertEquals(
+        key,
+        new JSONObject(eventFromQueue.getLeft().get(0))
+            .getJSONObject("s3")
+            .getJSONObject("object")
+            .getString("key"));
+    assertEquals("1627376736755", eventFromQueue.getRight());
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
new file mode 100644
index 0000000..49ea2de
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Utils Class for unit testing on CloudObject related sources.
+ */
+public class CloudObjectTestUtils {
+
+  /**
+   * Set a return value for mocked sqs instance. It will add a new messages 
(s3 Event) and set
+   * ApproximateNumberOfMessages attribute of the queue.
+   *
+   * @param sqs  Mocked instance of AmazonSQS
+   * @param path Absolute Path of file in FileSystem
+   */
+  public static void setMessagesInQueue(AmazonSQS sqs, Path path) {
+
+    ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
+    String approximateNumberOfMessages = "0";
+
+    if (path != null) {
+      String body =
+          "{\n  \"Type\" : \"Notification\",\n  \"MessageId\" : \"1\",\n  
\"TopicArn\" : \"arn:aws:sns:foo:123:"
+              + "foo-bar\",\n  \"Subject\" : \"Amazon S3 Notification\",\n  
\"Message\" : \"{\\\"Records\\\":"
+              + 
"[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+              + 
"-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+              + 
":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+              + 
"{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+              + 
"test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+              + 
"configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+              + path.getParent().toString().replace("hdfs://", "")
+              + 
"\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+              + ",\\\"object\\\":{\\\"key\\\":\\\""
+              + path.getName()
+              + 
"\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
+
+      Message message = new Message();
+      message.setReceiptHandle("1");
+      message.setMessageId("1");
+      message.setBody(body);
+
+      List<Message> messages = new ArrayList<>();
+      messages.add(message);
+      receiveMessageResult.setMessages(messages);
+      approximateNumberOfMessages = "1";
+    }
+    
when(sqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
+    when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
+        .thenReturn(
+            new GetQueueAttributesResult()
+                .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, 
approximateNumberOfMessages));
+  }
+
+  /**
+   * Mock the sqs.deleteMessageBatch() method from queue.
+   *
+   * @param sqs Mocked instance of AmazonSQS
+   */
+  public static void deleteMessagesInQueue(AmazonSQS sqs) {
+    when(sqs.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
+        .thenReturn(new DeleteMessageBatchResult());
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
new file mode 100644
index 0000000..dfd3795
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils.sources;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An abstract test base for {@link Source} using CloudObjects as the file 
system.
+ */
+public abstract class AbstractCloudObjectsSourceTestBase extends 
UtilitiesTestBase {
+
+  protected FilebasedSchemaProvider schemaProvider;
+  protected String dfsRoot;
+  protected String fileSuffix;
+  protected HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator();
+  protected boolean useFlattenedSchema = false;
+  protected String sqsUrl = "test-queue";
+  protected String regionName = "us-east-1";
+  @Mock
+  protected AmazonSQS sqs;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass();
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), 
jsc);
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  /**
+   * Prepares the specific {@link Source} to test, by passing in necessary 
configurations.
+   *
+   * @return A {@link Source} using DFS as the file system.
+   */
+  protected abstract Source prepareCloudObjectSource();
+
+  /**
+   * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file 
on DFS.
+   *
+   * @param records Test data.
+   * @param path    The path in {@link Path} of the file to write.
+   */
+  protected abstract void writeNewDataToFile(List<HoodieRecord> records, Path 
path)
+      throws IOException;
+
+  /**
+   * Generates a batch of test data and writes the data to a file.
+   *
+   * @param filename    The name of the file.
+   * @param instantTime The commit time.
+   * @param n           The number of records to generate.
+   * @return The file path.
+   */
+  protected Path generateOneFile(String filename, String instantTime, int n) 
throws IOException {
+    Path path = new Path(dfsRoot, filename + fileSuffix);
+    writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, 
useFlattenedSchema), path);
+    return path;
+  }
+
+  public void generateMessageInQueue(String filename) {
+    Path path = null;
+    if (filename != null) {
+      path = new Path(dfsRoot, filename + fileSuffix);
+    }
+    CloudObjectTestUtils.setMessagesInQueue(sqs, path);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 9f523aa..53d25fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,6 +153,7 @@
     <shadeSources>true</shadeSources>
     <zk-curator.version>2.7.1</zk-curator.version>
     <antlr.version>4.7</antlr.version>
+    <aws.sdk.version>1.12.22</aws.sdk.version>
   </properties>
 
   <scm>

Reply via email to