This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cc96e8 [HUDI-1897] Deltastreamer source for AWS S3 (#3433)
5cc96e8 is described below
commit 5cc96e85c13e56e866ca15063bf08c204d967377
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Aug 14 17:55:10 2021 +0530
[HUDI-1897] Deltastreamer source for AWS S3 (#3433)
- Added two sources for two stage pipeline. a. S3EventsSource that fetches
events from SQS and ingests to a meta hoodie table. b. S3EventsHoodieIncrSource
reads S3 events from this meta hoodie table, fetches actual objects from S3 and
ingests to sink hoodie table.
- Added selectors to assist in S3EventsSource.
Co-authored-by: Satish M
<[email protected]>
Co-authored-by: Vinoth Chandar <[email protected]>
---
hudi-utilities/pom.xml | 8 +
.../hudi/utilities/sources/HoodieIncrSource.java | 32 ++-
.../sources/S3EventsHoodieIncrSource.java | 137 ++++++++++
.../hudi/utilities/sources/S3EventsSource.java | 87 +++++++
.../sources/helpers/CloudObjectsSelector.java | 280 +++++++++++++++++++++
.../sources/helpers/S3EventsMetaSelector.java | 161 ++++++++++++
.../hudi/utilities/sources/TestS3EventsSource.java | 112 +++++++++
.../sources/helpers/TestCloudObjectsSelector.java | 226 +++++++++++++++++
.../sources/helpers/TestS3EventsMetaSelector.java | 105 ++++++++
.../utilities/testutils/CloudObjectTestUtils.java | 98 ++++++++
.../AbstractCloudObjectsSourceTestBase.java | 114 +++++++++
pom.xml | 1 +
12 files changed, 1348 insertions(+), 13 deletions(-)
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index c8a58d5..5ef0465 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -402,6 +402,14 @@
<scope>test</scope>
</dependency>
+ <!-- AWS Services -->
+ <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sqs</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
<!-- Hive - Test -->
<dependency>
<groupId>${hive.groupid}</groupId>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index dd841f4..a217e6b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -42,45 +42,51 @@ public class HoodieIncrSource extends RowSource {
private static final Logger LOG =
LogManager.getLogger(HoodieIncrSource.class);
- protected static class Config {
+ static class Config {
/**
* {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie
table.
*/
- private static final String HOODIE_SRC_BASE_PATH =
"hoodie.deltastreamer.source.hoodieincr.path";
+ static final String HOODIE_SRC_BASE_PATH =
"hoodie.deltastreamer.source.hoodieincr.path";
/**
* {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants
whose changes can be incrementally fetched.
*/
- private static final String NUM_INSTANTS_PER_FETCH =
"hoodie.deltastreamer.source.hoodieincr.num_instants";
- private static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
+ static final String NUM_INSTANTS_PER_FETCH =
"hoodie.deltastreamer.source.hoodieincr.num_instants";
+ static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
/**
* {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that
needs to be added to source table after
* parsing _hoodie_partition_path.
*/
- private static final String HOODIE_SRC_PARTITION_FIELDS =
"hoodie.deltastreamer.source.hoodieincr.partition.fields";
+ static final String HOODIE_SRC_PARTITION_FIELDS =
"hoodie.deltastreamer.source.hoodieincr.partition.fields";
/**
* {@value #HOODIE_SRC_PARTITION_EXTRACTORCLASS} PartitionValueExtractor
class to extract partition fields from
* _hoodie_partition_path.
*/
- private static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
+ static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
"hoodie.deltastreamer.source.hoodieincr.partition.extractor.class";
- private static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
+ static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
/**
* {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to
incrementally fetch from latest committed
* instant when checkpoint is not provided.
*/
- private static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
+ static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
"hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt";
- private static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT =
false;
+ static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;
+
+ /**
+ * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading
dataset. Default value is parquet.
+ */
+ static final String SOURCE_FILE_FORMAT =
"hoodie.deltastreamer.source.hoodieincr.file.format";
+ static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
}
public HoodieIncrSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
@@ -123,10 +129,10 @@ public class HoodieIncrSource extends RowSource {
/*
* log.info("Partition Fields are : (" + partitionFields + "). Initial
Source Schema :" + source.schema());
- *
+ *
* StructType newSchema = new StructType(source.schema().fields()); for
(String field : partitionFields) { newSchema
* = newSchema.add(field, DataTypes.StringType, true); }
- *
+ *
* /** Validates if the commit time is sane and also generates Partition
fields from _hoodie_partition_path if
* configured
*
@@ -139,7 +145,7 @@ public class HoodieIncrSource extends RowSource {
* "#partition-fields != #partition-values-extracted"); List<Object>
rowObjs = new
*
ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq()));
rowObjs.addAll(partitionVals); return
* RowFactory.create(rowObjs.toArray()); } return row; },
RowEncoder.apply(newSchema));
- *
+ *
* log.info("Validated Source Schema :" + validated.schema());
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
new file mode 100644
index 0000000..79e4abb
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
+
+/**
+ * This source will use the S3 events meta information from hoodie table
generate by {@link S3EventsSource}.
+ */
+public class S3EventsHoodieIncrSource extends HoodieIncrSource {
+
+ private static final Logger LOG =
LogManager.getLogger(S3EventsHoodieIncrSource.class);
+
+ static class Config {
+ // control whether we do existence check for files before consuming them
+ static final String ENABLE_EXISTS_CHECK =
"hoodie.deltastreamer.source.s3incr.check.file.exists";
+ static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
+ }
+
+ public S3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+ String srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+ int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH,
DEFAULT_NUM_INSTANTS_PER_FETCH);
+ boolean readLatestOnMissingCkpt = props.getBoolean(
+ READ_LATEST_INSTANT_ON_MISSING_CKPT,
DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+ // Use begin Instant if set and non-empty
+ Option<String> beginInstant =
+ lastCkptStr.isPresent()
+ ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
+ : Option.empty();
+
+ Pair<String, String> instantEndpts =
+ IncrSourceHelper.calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
readLatestOnMissingCkpt);
+
+ if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
+ LOG.warn("Already caught up. Begin Checkpoint was :" +
instantEndpts.getKey());
+ return Pair.of(Option.empty(), instantEndpts.getKey());
+ }
+
+ // Do incremental pull. Set end instant if available.
+ DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
instantEndpts.getLeft())
+ .option(DataSourceReadOptions.END_INSTANTTIME().key(),
instantEndpts.getRight());
+ Dataset<Row> source = metaReader.load(srcPath);
+ // Extract distinct file keys from s3 meta hoodie table
+ final List<Row> cloudMetaDf = source
+ .filter("s3.object.size > 0")
+ .select("s3.bucket.name", "s3.object.key")
+ .distinct()
+ .collectAsList();
+ // Create S3 paths
+ final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK,
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+ List<String> cloudFiles = new ArrayList<>();
+ for (Row row : cloudMetaDf) {
+ // construct file path, row index 0 refers to bucket and 1 refers to key
+ String bucket = row.getString(0);
+ String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
+ if (checkExists) {
+ FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket,
sparkSession.sparkContext().hadoopConfiguration());
+ try {
+ if (fs.exists(new Path(filePath))) {
+ cloudFiles.add(filePath);
+ }
+ } catch (IOException e) {
+ LOG.error(String.format("Error while checking path exists for %s ",
filePath), e);
+ }
+ } else {
+ cloudFiles.add(filePath);
+ }
+ }
+ String fileFormat = props.getString(SOURCE_FILE_FORMAT,
DEFAULT_SOURCE_FILE_FORMAT);
+ Option<Dataset<Row>> dataset = Option.empty();
+ if (!cloudFiles.isEmpty()) {
+ dataset =
Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new
String[0])));
+ }
+ return Pair.of(dataset, instantEndpts.getRight());
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
new file mode 100644
index 0000000..717c414
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hudi table for S3 events
metadata (eg. S3
+ * put events data). It will use the SQS for receiving the object key events.
This can be useful
+ * to check S3 files activity over time. The hudi table created by this source
is consumed by
+ * {@link S3EventsHoodieIncrSource} to apply changes to the hudi table
corresponding to user data.
+ */
+public class S3EventsSource extends RowSource {
+
+ private final S3EventsMetaSelector pathSelector;
+ private final List<Message> processedMessages = new ArrayList<>();
+ AmazonSQS sqs;
+
+ public S3EventsSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
+ this.sqs = this.pathSelector.createAmazonSqsClient();
+ }
+
+ /**
+ * Fetches next events from the queue.
+ *
+ * @param lastCkptStr The last checkpoint instant string, empty if first run.
+ * @param sourceLimit Limit on the size of data to fetch. For {@link
S3EventsSource},
+ * {@link
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config#S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH}
is used.
+ * @return A pair of dataset of event records and the next checkpoint
instant string
+ */
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ Pair<List<String>, String> selectPathsWithLatestSqsMessage =
+ pathSelector.getNextEventsFromQueue(sqs, lastCkptStr,
processedMessages);
+ if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) {
+ return Pair.of(Option.empty(),
selectPathsWithLatestSqsMessage.getRight());
+ } else {
+ Dataset<String> eventRecords =
sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(),
Encoders.STRING());
+ return Pair.of(
+ Option.of(sparkSession.read().json(eventRecords)),
+ selectPathsWithLatestSqsMessage.getRight());
+ }
+ }
+
+ @Override
+ public void onCommit(String lastCkptStr) {
+ pathSelector.deleteProcessedMessages(sqs, pathSelector.queueUrl,
processedMessages);
+ processedMessages.clear();
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
new file mode 100644
index 0000000..7252494
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class has methods for processing cloud objects.
+ * It currently supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+ public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+ Collections.singletonList("ObjectCreated");
+ public static final String S3_PREFIX = "s3://";
+ public static volatile Logger log =
LogManager.getLogger(CloudObjectsSelector.class);
+ public static final String SQS_ATTR_APPROX_MESSAGES =
"ApproximateNumberOfMessages";
+ static final String SQS_MODEL_MESSAGE = "Message";
+ static final String SQS_MODEL_EVENT_RECORDS = "Records";
+ static final String SQS_MODEL_EVENT_NAME = "eventName";
+ static final String S3_MODEL_EVENT_TIME = "eventTime";
+ static final String S3_FILE_SIZE = "fileSize";
+ static final String S3_FILE_PATH = "filePath";
+ public final String queueUrl;
+ public final int longPollWait;
+ public final int maxMessagesPerRequest;
+ public final int maxMessagePerBatch;
+ public final int visibilityTimeout;
+ public final TypedProperties props;
+ public final String fsName;
+ private final String regionName;
+
+ /**
+ * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+ */
+ public CloudObjectsSelector(TypedProperties props) {
+ DataSourceUtils.checkRequiredProperties(props,
Arrays.asList(Config.S3_SOURCE_QUEUE_URL, Config.S3_SOURCE_QUEUE_REGION));
+ this.props = props;
+ this.queueUrl = props.getString(Config.S3_SOURCE_QUEUE_URL);
+ this.regionName = props.getString(Config.S3_SOURCE_QUEUE_REGION);
+ this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS,
"s3").toLowerCase();
+ this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20);
+ this.maxMessagePerBatch =
props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5);
+ this.visibilityTimeout =
props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30);
+ this.maxMessagesPerRequest = 10;
+ }
+
+ /**
+ * Get SQS queue attributes.
+ *
+ * @param sqsClient AWSClient for sqsClient
+ * @param queueUrl queue full url
+ * @return map of attributes needed
+ */
+ protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient,
String queueUrl) {
+ GetQueueAttributesResult queueAttributesResult =
sqsClient.getQueueAttributes(
+ new
GetQueueAttributesRequest(queueUrl).withAttributeNames(SQS_ATTR_APPROX_MESSAGES)
+ );
+ return queueAttributesResult.getAttributes();
+ }
+
+ /**
+ * Get the file attributes filePath, eventTime and size from JSONObject
record.
+ *
+ * @param record of object event
+ * @return map of file attribute
+ */
+ protected Map<String, Object> getFileAttributesFromRecord(JSONObject record)
throws UnsupportedEncodingException {
+ Map<String, Object> fileRecord = new HashMap<>();
+ String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME);
+ long eventTime =
+
Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+ JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+ String bucket =
URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"),
"UTF-8");
+ String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+ String filePath = this.fsName + "://" + bucket + "/" + key;
+ fileRecord.put(S3_MODEL_EVENT_TIME, eventTime);
+ fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size"));
+ fileRecord.put(S3_FILE_PATH, filePath);
+ return fileRecord;
+ }
+
+ /**
+ * Amazon SQS Client Builder.
+ */
+ public AmazonSQS createAmazonSqsClient() {
+ return
AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+ }
+
+ /**
+ * List messages from queue.
+ */
+ protected List<Message> getMessagesToProcess(
+ AmazonSQS sqsClient,
+ String queueUrl,
+ int longPollWait,
+ int visibilityTimeout,
+ int maxMessagePerBatch,
+ int maxMessagesPerRequest) {
+ List<Message> messagesToProcess = new ArrayList<>();
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(queueUrl)
+ .withWaitTimeSeconds(longPollWait)
+ .withVisibilityTimeout(visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(maxMessagesPerRequest);
+ // Get count for available messages
+ Map<String, String> queueAttributesResult =
getSqsQueueAttributes(sqsClient, queueUrl);
+ long approxMessagesAvailable =
Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES));
+ log.info("Approximately " + approxMessagesAvailable + " messages available
in queue.");
+ long numMessagesToProcess = Math.min(approxMessagesAvailable,
maxMessagePerBatch);
+ for (int i = 0;
+ i < (int) Math.ceil((double) numMessagesToProcess /
maxMessagesPerRequest);
+ ++i) {
+ List<Message> messages =
sqsClient.receiveMessage(receiveMessageRequest).getMessages();
+ log.debug("Number of messages: " + messages.size());
+ messagesToProcess.addAll(messages);
+ if (messages.isEmpty()) {
+ // ApproximateNumberOfMessages value is eventually consistent.
+ // So, we still need to check and break if there are no messages.
+ break;
+ }
+ }
+ return messagesToProcess;
+ }
+
+ /**
+ * Create partitions of list using specific batch size. we can't use third
party API for this
+ * functionality, due to
https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270
+ */
+ protected List<List<Message>> createListPartitions(List<Message> singleList,
int eachBatchSize) {
+ List<List<Message>> listPartitions = new ArrayList<>();
+ if (singleList.size() == 0 || eachBatchSize < 1) {
+ return listPartitions;
+ }
+ for (int start = 0; start < singleList.size(); start += eachBatchSize) {
+ int end = Math.min(start + eachBatchSize, singleList.size());
+ if (start > end) {
+ throw new IndexOutOfBoundsException(
+ "Index " + start + " is out of the list range <0," +
(singleList.size() - 1) + ">");
+ }
+ listPartitions.add(new ArrayList<>(singleList.subList(start, end)));
+ }
+ return listPartitions;
+ }
+
+ /**
+ * Delete batch of messages from queue.
+ */
+ protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl,
List<Message> messagesToBeDeleted) {
+ DeleteMessageBatchRequest deleteBatchReq =
+ new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
+ List<DeleteMessageBatchRequestEntry> deleteEntries =
deleteBatchReq.getEntries();
+ for (Message message : messagesToBeDeleted) {
+ deleteEntries.add(
+ new DeleteMessageBatchRequestEntry()
+ .withId(message.getMessageId())
+ .withReceiptHandle(message.getReceiptHandle()));
+ }
+ DeleteMessageBatchResult deleteResult =
sqs.deleteMessageBatch(deleteBatchReq);
+ List<String> deleteFailures =
+ deleteResult.getFailed().stream()
+ .map(BatchResultErrorEntry::getId)
+ .collect(Collectors.toList());
+ if (!deleteFailures.isEmpty()) {
+ log.warn(
+ "Failed to delete "
+ + deleteFailures.size()
+ + " messages out of "
+ + deleteEntries.size()
+ + " from queue.");
+ } else {
+ log.info("Successfully deleted " + deleteEntries.size() + " messages
from queue.");
+ }
+ }
+
+ /**
+ * Delete Queue Messages after hudi commit. This method will be invoked by
source.onCommit.
+ */
+ public void deleteProcessedMessages(AmazonSQS sqs, String queueUrl,
List<Message> processedMessages) {
+ if (!processedMessages.isEmpty()) {
+ // create batch for deletion, SES DeleteMessageBatchRequest only accept
max 10 entries
+ List<List<Message>> deleteBatches =
createListPartitions(processedMessages, 10);
+ for (List<Message> deleteBatch : deleteBatches) {
+ deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
+ }
+ }
+ }
+
+ /**
+ * Configs supported.
+ */
+ public static class Config {
+ private static final String HOODIE_DELTASTREAMER_S3_SOURCE =
"hoodie.deltastreamer.s3.source";
+ /**
+ * {@value #S3_SOURCE_QUEUE_URL} is the queue url for cloud object events.
+ */
+ public static final String S3_SOURCE_QUEUE_URL =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.url";
+
+ /**
+ * {@value #S3_SOURCE_QUEUE_REGION} is the case-sensitive region name of
the cloud provider for the queue. For example, "us-east-1".
+ */
+ public static final String S3_SOURCE_QUEUE_REGION =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.region";
+
+ /**
+ * {@value #S3_SOURCE_QUEUE_FS} is file system corresponding to queue. For
example, for AWS SQS it is s3/s3a.
+ */
+ public static final String S3_SOURCE_QUEUE_FS =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.fs";
+
+ /**
+ * {@value #S3_QUEUE_LONG_POLL_WAIT} is the long poll wait time in seconds
If set as 0 then
+ * client will fetch on short poll basis.
+ */
+ public static final String S3_QUEUE_LONG_POLL_WAIT =
+ HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.long.poll.wait";
+
+ /**
+ * {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is max messages for
each batch of delta streamer
+ * run. Source will process these maximum number of message at a time.
+ */
+ public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH =
+ HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch";
+
+ /**
+ * {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for
messages in queue. After we
+ * consume the message, queue will move the consumed messages to in-flight
state, these messages
+ * can't be consumed again by source for this timeout period.
+ */
+ public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT =
+ HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.visibility.timeout";
+
+ /**
+ * {@value #SOURCE_INPUT_SELECTOR} source input selector.
+ */
+ public static final String SOURCE_INPUT_SELECTOR =
"hoodie.deltastreamer.source.input.selector";
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
new file mode 100644
index 0000000..68ac7ab
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * S3 events metadata selector class. This class provides methods to process
the
+ * messages from SQS for {@link
org.apache.hudi.utilities.sources.S3EventsSource}.
+ */
+public class S3EventsMetaSelector extends CloudObjectsSelector {
+
+ private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements";
+
+ /**
+ * Cloud Objects Meta Selector Class. {@link CloudObjectsSelector}
+ */
+ public S3EventsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * S3EventsMetaSelector}
+ */
+ public static S3EventsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ S3EventsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ S3EventsMetaSelector.class.getName());
+ try {
+ S3EventsMetaSelector selector =
+ (S3EventsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the filtered list of valid S3 events in SQS.
+ */
+ protected List<Map<String, Object>> getValidEvents(AmazonSQS sqs,
List<Message> processedMessages) throws IOException {
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ this.longPollWait,
+ this.visibilityTimeout,
+ this.maxMessagePerBatch,
+ this.maxMessagesPerRequest);
+ return processAndDeleteInvalidMessages(processedMessages, messages);
+ }
+
+ private List<Map<String, Object>>
processAndDeleteInvalidMessages(List<Message> processedMessages,
+
List<Message> messages) throws IOException {
+ List<Map<String, Object>> validEvents = new ArrayList<>();
+ for (Message message : messages) {
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+ if (messageBody.has(SQS_MODEL_MESSAGE)) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey(SQS_MODEL_EVENT_RECORDS)) {
+ List<Map<String, Object>> events = (List<Map<String, Object>>)
messageMap.get(SQS_MODEL_EVENT_RECORDS);
+ for (Map<String, Object> event : events) {
+ event.remove(S3_EVENT_RESPONSE_ELEMENTS);
+ String eventName = (String) event.get(SQS_MODEL_EVENT_NAME);
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ validEvents.add(event);
+ } else {
+ log.debug(String.format("This S3 event %s is not allowed, so
ignoring it.", eventName));
+ }
+ }
+ } else {
+ log.debug(String.format("Message is not expected format or it's
s3:TestEvent. Message: %s", message));
+ }
+ processedMessages.add(message);
+ }
+ return validEvents;
+ }
+
+ /**
+ * Get the list of events from queue.
+ *
+ * @param lastCheckpointStr The last checkpoint instant string, empty if
first run.
+ * @return A pair of dataset of event records and the next checkpoint
instant string.
+ */
+ public Pair<List<String>, String> getNextEventsFromQueue(AmazonSQS sqs,
+ Option<String>
lastCheckpointStr,
+ List<Message>
processedMessages) {
+ processedMessages.clear();
+ log.info("Reading messages....");
+ try {
+ log.info("Start Checkpoint : " + lastCheckpointStr);
+ List<Map<String, Object>> eventRecords = getValidEvents(sqs,
processedMessages);
+ log.info("Number of valid events: " + eventRecords.size());
+ List<String> filteredEventRecords = new ArrayList<>();
+ long newCheckpointTime = eventRecords.stream()
+ .mapToLong(eventRecord -> Date.from(Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse((String)
eventRecord.get(S3_MODEL_EVENT_TIME))))
+
.getTime()).max().orElse(lastCheckpointStr.map(Long::parseLong).orElse(0L));
+
+ for (Map<String, Object> eventRecord : eventRecords) {
+ filteredEventRecords.add(new
ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
+ }
+ return new ImmutablePair<>(filteredEventRecords,
String.valueOf(newCheckpointTime));
+ } catch (JSONException | IOException e) {
+ throw new HoodieException("Unable to read from SQS: ", e);
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
new file mode 100644
index 0000000..3d89dc2
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import
org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_FS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Basic tests for {@link S3EventsSource}.
+ */
+public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase {
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ this.dfsRoot = dfsBasePath + "/parquetFiles";
+ this.fileSuffix = ".parquet";
+ dfs.mkdirs(new Path(dfsRoot));
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ /**
+ * Runs the test scenario of reading data from the source.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadingFromSource() throws IOException {
+
+ SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(prepareCloudObjectSource());
+
+ // 1. Extract without any checkpoint => (no data available)
+ generateMessageInQueue(null);
+ assertEquals(
+ Option.empty(),
+ sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+
+ // 2. Extract without any checkpoint => (adding new file)
+ generateMessageInQueue("1");
+
+ // Test fetching Avro format
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
+ sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE);
+ assertEquals(1, fetch1.getBatch().get().count());
+
+ // 3. Produce new data, extract new data
+ generateMessageInQueue("2");
+ // Test fetching Avro format
+ InputBatch<JavaRDD<GenericRecord>> fetch2 =
+ sourceFormatAdapter.fetchNewDataInAvroFormat(
+ Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
+ assertEquals(1, fetch2.getBatch().get().count());
+
+ GenericRecord s3 = (GenericRecord)
fetch2.getBatch().get().rdd().first().get("s3");
+ GenericRecord s3Object = (GenericRecord) s3.get("object");
+ assertEquals("2.parquet", s3Object.get("key").toString());
+ }
+
+ @Override
+ public Source prepareCloudObjectSource() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+ props.setProperty(S3_SOURCE_QUEUE_REGION, regionName);
+ props.setProperty(S3_SOURCE_QUEUE_FS, "hdfs");
+ S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession,
null);
+ dfsSource.sqs = this.sqs;
+ return dfsSource;
+ }
+
+ @Override
+ public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws
IOException {
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
new file mode 100644
index 0000000..02eaccf
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONObject;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_PATH;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_SIZE;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_MODEL_EVENT_TIME;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_EVENT_RECORDS;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_MESSAGE;
+import static
org.apache.hudi.utilities.testutils.CloudObjectTestUtils.deleteMessagesInQueue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCloudObjectsSelector extends HoodieClientTestHarness {
+
+ static final String REGION_NAME = "us-east-1";
+
+ TypedProperties props;
+ String sqsUrl;
+
+ @Mock
+ AmazonSQS sqs;
+
+ @Mock
+ private CloudObjectsSelector cloudObjectsSelector;
+
+ @BeforeEach
+ void setUp() {
+ initSparkContexts();
+ initPath();
+ initFileSystem();
+ MockitoAnnotations.initMocks(this);
+
+ props = new TypedProperties();
+ sqsUrl = "test-queue";
+ props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+ props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ Mockito.reset(cloudObjectsSelector);
+ cleanupResources();
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {CloudObjectsSelector.class})
+ public void testSqsQueueAttributesShouldReturnsRequiredAttribute(Class<?>
clazz) {
+ CloudObjectsSelector selector =
+ (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(),
props);
+
+ // setup the queue attributes
+ CloudObjectTestUtils.setMessagesInQueue(sqs, null);
+
+ // test the return values
+ Map<String, String> queueAttributes = selector.getSqsQueueAttributes(sqs,
sqsUrl);
+ assertEquals(1, queueAttributes.size());
+ // ApproximateNumberOfMessages is a required queue attribute for Cloud
object selector
+ assertEquals("0", queueAttributes.get(SQS_ATTR_APPROX_MESSAGES));
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {CloudObjectsSelector.class})
+ public void testFileAttributesFromRecordShouldReturnsExpectOutput(Class<?>
clazz)
+ throws IOException {
+
+ CloudObjectsSelector selector =
+ (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(),
props);
+
+ // setup s3 record
+ String bucket = "test-bucket";
+ String key =
"test/year=test1/month=test2/day=test3/part-foo-bar.snappy.parquet";
+
+ String s3Records =
+ "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n
\"TopicArn\" : \"arn:aws:sns:foo:123:"
+ + "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n
\"Message\" : \"{\\\"Records\\\":"
+ +
"[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+ +
"-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+ +
":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+ +
"{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+ +
"test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+ +
"configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+ + bucket
+ +
"\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+ + ",\\\"object\\\":{\\\"key\\\":\\\""
+ + key
+ +
"\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
+ JSONObject messageBody = new JSONObject(s3Records);
+ Map<String, Object> messageMap = new HashMap<>();
+ if (messageBody.has(SQS_MODEL_MESSAGE)) {
+ ObjectMapper mapper = new ObjectMapper();
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
+ }
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get(SQS_MODEL_EVENT_RECORDS);
+
+ // test the return values
+ Map<String, Object> fileAttributes =
+ selector.getFileAttributesFromRecord(new JSONObject(records.get(0)));
+
+ assertEquals(3, fileAttributes.size());
+ assertEquals(123L, (long) fileAttributes.get(S3_FILE_SIZE));
+ assertEquals(S3_PREFIX + bucket + "/" + key,
fileAttributes.get(S3_FILE_PATH));
+ assertEquals(1627376736755L, (long)
fileAttributes.get(S3_MODEL_EVENT_TIME));
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {CloudObjectsSelector.class})
+ public void testCreateListPartitionsReturnsExpectedSetOfBatch(Class<?>
clazz) {
+
+ CloudObjectsSelector selector =
+ (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(),
props);
+
+ // setup lists
+ List<Message> testSingleList = new ArrayList<>();
+ testSingleList.add(new Message().addAttributesEntry("id", "1"));
+ testSingleList.add(new Message().addAttributesEntry("id", "2"));
+ testSingleList.add(new Message().addAttributesEntry("id", "3"));
+ testSingleList.add(new Message().addAttributesEntry("id", "4"));
+ testSingleList.add(new Message().addAttributesEntry("id", "5"));
+
+ List<Message> expectedFirstList = new ArrayList<>();
+ expectedFirstList.add(new Message().addAttributesEntry("id", "1"));
+ expectedFirstList.add(new Message().addAttributesEntry("id", "2"));
+
+ List<Message> expectedSecondList = new ArrayList<>();
+ expectedSecondList.add(new Message().addAttributesEntry("id", "3"));
+ expectedSecondList.add(new Message().addAttributesEntry("id", "4"));
+
+ List<Message> expectedFinalList = new ArrayList<>();
+ expectedFinalList.add(new Message().addAttributesEntry("id", "5"));
+
+ // test the return values
+ List<List<Message>> partitionedList =
selector.createListPartitions(testSingleList, 2);
+
+ assertEquals(3, partitionedList.size());
+ assertEquals(expectedFirstList, partitionedList.get(0));
+ assertEquals(expectedSecondList, partitionedList.get(1));
+ assertEquals(expectedFinalList, partitionedList.get(2));
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {CloudObjectsSelector.class})
+ public void testCreateListPartitionsReturnsEmptyIfBatchSizeIsZero(Class<?>
clazz) {
+
+ CloudObjectsSelector selector =
+ (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(),
props);
+
+ // setup lists
+ List<Message> testSingleList = new ArrayList<>();
+ testSingleList.add(new Message().addAttributesEntry("id", "1"));
+ testSingleList.add(new Message().addAttributesEntry("id", "2"));
+
+ // test the return values
+ List<List<Message>> partitionedList =
selector.createListPartitions(testSingleList, 0);
+
+ assertEquals(0, partitionedList.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {CloudObjectsSelector.class})
+ public void testOnCommitDeleteProcessedMessages(Class<?> clazz) {
+
+ CloudObjectsSelector selector =
+ (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(),
props);
+
+ // setup lists
+ List<Message> testSingleList = new ArrayList<>();
+ testSingleList.add(
+ new Message()
+ .addAttributesEntry("MessageId", "1")
+ .addAttributesEntry("ReceiptHandle", "1"));
+ testSingleList.add(
+ new Message()
+ .addAttributesEntry("MessageId", "2")
+ .addAttributesEntry("ReceiptHandle", "1"));
+
+ deleteMessagesInQueue(sqs);
+
+ // test the return values
+ selector.deleteProcessedMessages(sqs, sqsUrl, testSingleList);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
new file mode 100644
index 0000000..2208543
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.hadoop.fs.Path;
+import org.json.JSONObject;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestS3EventsMetaSelector extends HoodieClientTestHarness {
+
+ TypedProperties props;
+ String sqsUrl;
+
+ @Mock
+ AmazonSQS sqs;
+
+ @Mock
+ private S3EventsMetaSelector s3EventsMetaSelector;
+
+ @BeforeEach
+ void setUp() {
+ initSparkContexts();
+ initPath();
+ initFileSystem();
+ MockitoAnnotations.initMocks(this);
+
+ props = new TypedProperties();
+ sqsUrl = "test-queue";
+ props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
+ props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ Mockito.reset(s3EventsMetaSelector);
+ cleanupResources();
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {S3EventsMetaSelector.class})
+ public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class<?>
clazz) {
+ S3EventsMetaSelector selector = (S3EventsMetaSelector)
ReflectionUtils.loadClass(clazz.getName(), props);
+ // setup s3 record
+ String bucket = "test-bucket";
+ String key = "part-foo-bar.snappy.parquet";
+ Path path = new Path(bucket, key);
+ CloudObjectTestUtils.setMessagesInQueue(sqs, path);
+
+ List<Message> processed = new ArrayList<>();
+
+ // test the return values
+ Pair<List<String>, String> eventFromQueue =
+ selector.getNextEventsFromQueue(sqs, Option.empty(), processed);
+
+ assertEquals(1, eventFromQueue.getLeft().size());
+ assertEquals(1, processed.size());
+ assertEquals(
+ key,
+ new JSONObject(eventFromQueue.getLeft().get(0))
+ .getJSONObject("s3")
+ .getJSONObject("object")
+ .getString("key"));
+ assertEquals("1627376736755", eventFromQueue.getRight());
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
new file mode 100644
index 0000000..49ea2de
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Utils Class for unit testing on CloudObject related sources.
+ */
+public class CloudObjectTestUtils {
+
+ /**
+ * Set a return value for mocked sqs instance. It will add a new messages
(s3 Event) and set
+ * ApproximateNumberOfMessages attribute of the queue.
+ *
+ * @param sqs Mocked instance of AmazonSQS
+ * @param path Absolute Path of file in FileSystem
+ */
+ public static void setMessagesInQueue(AmazonSQS sqs, Path path) {
+
+ ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
+ String approximateNumberOfMessages = "0";
+
+ if (path != null) {
+ String body =
+ "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n
\"TopicArn\" : \"arn:aws:sns:foo:123:"
+ + "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n
\"Message\" : \"{\\\"Records\\\":"
+ +
"[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+ +
"-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+ +
":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+ +
"{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+ +
"test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+ +
"configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+ + path.getParent().toString().replace("hdfs://", "")
+ +
"\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+ + ",\\\"object\\\":{\\\"key\\\":\\\""
+ + path.getName()
+ +
"\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
+
+ Message message = new Message();
+ message.setReceiptHandle("1");
+ message.setMessageId("1");
+ message.setBody(body);
+
+ List<Message> messages = new ArrayList<>();
+ messages.add(message);
+ receiveMessageResult.setMessages(messages);
+ approximateNumberOfMessages = "1";
+ }
+
when(sqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
+ when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
+ .thenReturn(
+ new GetQueueAttributesResult()
+ .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES,
approximateNumberOfMessages));
+ }
+
+ /**
+ * Mock the sqs.deleteMessageBatch() method from queue.
+ *
+ * @param sqs Mocked instance of AmazonSQS
+ */
+ public static void deleteMessagesInQueue(AmazonSQS sqs) {
+ when(sqs.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
+ .thenReturn(new DeleteMessageBatchResult());
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
new file mode 100644
index 0000000..dfd3795
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils.sources;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An abstract test base for {@link Source} using CloudObjects as the file
system.
+ */
+public abstract class AbstractCloudObjectsSourceTestBase extends
UtilitiesTestBase {
+
+ protected FilebasedSchemaProvider schemaProvider;
+ protected String dfsRoot;
+ protected String fileSuffix;
+ protected HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator();
+ protected boolean useFlattenedSchema = false;
+ protected String sqsUrl = "test-queue";
+ protected String regionName = "us-east-1";
+ @Mock
+ protected AmazonSQS sqs;
+
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass();
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(),
jsc);
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ /**
+ * Prepares the specific {@link Source} to test, by passing in necessary
configurations.
+ *
+ * @return A {@link Source} using DFS as the file system.
+ */
+ protected abstract Source prepareCloudObjectSource();
+
+ /**
+ * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file
on DFS.
+ *
+ * @param records Test data.
+ * @param path The path in {@link Path} of the file to write.
+ */
+ protected abstract void writeNewDataToFile(List<HoodieRecord> records, Path
path)
+ throws IOException;
+
+ /**
+ * Generates a batch of test data and writes the data to a file.
+ *
+ * @param filename The name of the file.
+ * @param instantTime The commit time.
+ * @param n The number of records to generate.
+ * @return The file path.
+ */
+ protected Path generateOneFile(String filename, String instantTime, int n)
throws IOException {
+ Path path = new Path(dfsRoot, filename + fileSuffix);
+ writeNewDataToFile(dataGenerator.generateInserts(instantTime, n,
useFlattenedSchema), path);
+ return path;
+ }
+
+ public void generateMessageInQueue(String filename) {
+ Path path = null;
+ if (filename != null) {
+ path = new Path(dfsRoot, filename + fileSuffix);
+ }
+ CloudObjectTestUtils.setMessagesInQueue(sqs, path);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9f523aa..53d25fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,6 +153,7 @@
<shadeSources>true</shadeSources>
<zk-curator.version>2.7.1</zk-curator.version>
<antlr.version>4.7</antlr.version>
+ <aws.sdk.version>1.12.22</aws.sdk.version>
</properties>
<scm>