codope commented on a change in pull request #3433:
URL: https://github.com/apache/hudi/pull/3433#discussion_r688480052



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Cloud Objects Selector Class. This class has methods for processing cloud 
objects. It currently
+ * supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+  public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+      Collections.singletonList("ObjectCreated");
+  public static volatile Logger log = 
LogManager.getLogger(CloudObjectsSelector.class);
+  public final String queueUrl;
+  public final int longPollWait;
+  public final int maxMessagesEachRequest;
+  public final int maxMessageEachBatch;
+  public final int visibilityTimeout;
+  public final TypedProperties props;
+  public final String fsName;
+  private final String regionName;
+
+  /**
+   * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+   */
+  public CloudObjectsSelector(TypedProperties props) {
+    DataSourceUtils.checkRequiredProperties(props, 
Arrays.asList(Config.QUEUE_URL_PROP, Config.QUEUE_REGION));
+    this.props = props;
+    this.queueUrl = props.getString(Config.QUEUE_URL_PROP);
+    this.regionName = props.getString(Config.QUEUE_REGION);
+    this.fsName = props.getString(Config.SOURCE_QUEUE_FS_PROP, 
"s3").toLowerCase();
+    this.longPollWait = props.getInteger(Config.QUEUE_LONGPOLLWAIT_PROP, 20);
+    this.maxMessageEachBatch = 
props.getInteger(Config.QUEUE_MAXMESSAGESEACHBATCH_PROP, 5);
+    this.visibilityTimeout = 
props.getInteger(Config.QUEUE_VISIBILITYTIMEOUT_PROP, 30);
+    this.maxMessagesEachRequest = 10;
+  }
+
+  /**
+   * Get SQS queue attributes.
+   *
+   * @param sqsClient AWSClient for sqsClient
+   * @param queueUrl  queue full url
+   * @return map of attributes needed
+   */
+  protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, 
String queueUrl) {
+    GetQueueAttributesResult queueAttributesResult =
+        sqsClient.getQueueAttributes(
+            new GetQueueAttributesRequest(queueUrl)
+                .withAttributeNames("ApproximateNumberOfMessages"));
+    return queueAttributesResult.getAttributes();
+  }
+
+  /**
+   * Get the file attributes filePath, eventTime and size from JSONObject 
record.
+   *
+   * @param record of object event
+   * @return map of file attribute
+   */
+  protected Map<String, Object> getFileAttributesFromRecord(JSONObject record)
+      throws UnsupportedEncodingException {
+
+    Map<String, Object> fileRecord = new HashMap<>();
+    String eventTimeStr = record.getString("eventTime");
+    long eventTime =
+        
Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+
+    JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+    String bucket =
+        URLDecoder.decode(
+            
record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
+    String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+    String filePath = this.fsName + "://" + bucket + "/" + key;
+
+    fileRecord.put("eventTime", eventTime);
+    fileRecord.put("fileSize", s3Object.getLong("size"));
+    fileRecord.put("filePath", filePath);
+    return fileRecord;
+  }
+
+  /**
+   * Amazon SQS Client Builder.
+   */
+  public AmazonSQS createAmazonSqsClient() {
+    return 
AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+  }
+
+  /**
+   * List messages from queue.
+   */
+  protected List<Message> getMessagesToProcess(
+      AmazonSQS sqsClient,
+      String queueUrl,
+      ReceiveMessageRequest receiveMessageRequest,
+      int maxMessageEachBatch,
+      int maxMessagesEachRequest) {
+    List<Message> messagesToProcess = new ArrayList<>();
+
+    // Get count for available messages
+    Map<String, String> queueAttributesResult = 
getSqsQueueAttributes(sqsClient, queueUrl);
+    long approxMessagesAvailable =
+        
Long.parseLong(queueAttributesResult.get("ApproximateNumberOfMessages"));
+    log.info("Approx. " + approxMessagesAvailable + " messages available in 
queue.");

Review comment:
       Done. But we will still need to check `messages.isEmpty()` and break off 
the loop because the the value of `ApproximateNumberOfMessages` returned by SQS 
is eventually consistent. So, in case this is some positive value but actually 
there are no messages, we don't want to run the loop again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to