[
https://issues.apache.org/jira/browse/HUDI-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398147#comment-17398147
]
ASF GitHub Bot commented on HUDI-1897:
--------------------------------------
nsivabalan commented on a change in pull request #3433:
URL: https://github.com/apache/hudi/pull/3433#discussion_r687849798
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
Review comment:
can we create constants for these "Message", "Records"(L119) and use the
variables.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
Review comment:
minor. rename to isEligibleMsg
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
Review comment:
move to debug.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
+ }
+ if (isMessageDelete) {
+ ineligibleMessages.add(message);
+ }
+ }
+ if (!ineligibleMessages.isEmpty()) {
+ deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+ }
+
+ return eligibleRecords;
+ }
+
+ /**
+ * Get the list of events from queue.
+ *
+ * @param sparkContext JavaSparkContext to help parallelize certain
operations
+ * @param lastCheckpointStr the last checkpoint time string, empty if first
run
+ * @return the list of events
+ */
+ public Pair<List<String>, String> getNextEventsFromQueue(
+ AmazonSQS sqs,
+ JavaSparkContext sparkContext,
+ Option<String> lastCheckpointStr,
+ List<Message> processedMessages) {
+
+ processedMessages.clear();
+
+ log.info("Reading messages....");
+
+ try {
+ log.info("Start Checkpoint : " + lastCheckpointStr);
+
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+
+ List<Map<String, Object>> eligibleEventRecords = getEligibleEvents(sqs,
processedMessages);
Review comment:
we could just name this as "eventRecords" or just "records". We don't
need to repeat "eligible" everywhere. its implicit
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
Review comment:
also, can we use valid and invalid instead of eligible and inEligible.
valid and invalid are most commonly used terminologies.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cloud Objects Hoodie Incr Source Class. {@link
CloudObjectsHoodieIncrSource}.This source will use
+ * the cloud files meta information form cloud meta hoodie table generate by
CloudObjectsMetaSource.
+ */
+public class CloudObjectsHoodieIncrSource extends HoodieIncrSource {
+
+ private static final Logger LOG =
LogManager.getLogger(CloudObjectsHoodieIncrSource.class);
+
+ public CloudObjectsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+ Option<String> lastCkptStr, long sourceLimit) {
+
+ DataSourceUtils.checkRequiredProperties(
+ props, Collections.singletonList(Config.HOODIE_SRC_BASE_PATH));
+
+ String srcPath = props.getString(Config.HOODIE_SRC_BASE_PATH);
+ int numInstantsPerFetch =
+ props.getInteger(Config.NUM_INSTANTS_PER_FETCH,
Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
+ boolean readLatestOnMissingCkpt =
+ props.getBoolean(
+ Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
+ Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+ // Use begin Instant if set and non-empty
+ Option<String> beginInstant =
+ lastCkptStr.isPresent()
+ ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
+ : Option.empty();
+
+ Pair<String, String> instantEndpts =
+ IncrSourceHelper.calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
readLatestOnMissingCkpt);
+
+ if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
+ LOG.warn("Already caught up. Begin Checkpoint was :" +
instantEndpts.getKey());
+ return Pair.of(Option.empty(), instantEndpts.getKey());
+ }
+
+ // Do Incr pull. Set end instant if available
+ DataFrameReader reader =
+ sparkSession
+ .read()
+ .format("org.apache.hudi")
+ .option(
+ DataSourceReadOptions.QUERY_TYPE().key(),
+ DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(
+ DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
instantEndpts.getLeft())
+ .option(
+ DataSourceReadOptions.END_INSTANTTIME().key(),
instantEndpts.getRight());
+
+ Dataset<Row> source = reader.load(srcPath);
+
+ // Extract distinct file keys from cloud meta hoodie table
+ final List<Row> cloudMetaDf =
+ source
+ .filter("s3.object.size > 0")
+ .select("s3.bucket.name", "s3.object.key")
+ .distinct()
+ .collectAsList();
+
+ // Create S3 paths
+ List<String> cloudFiles = new ArrayList<>();
+ for (Row row : cloudMetaDf) {
+ String bucket = row.getString(0);
+ String key = row.getString(1);
+ String filePath = "s3://" + bucket + "/" + key;
Review comment:
Can we do it in one line?
```
String filePath = "s3://" + row.getString(0) + "/" + row.getString(1);
```
Just add a comment that 0 refers to bucket and 1 refers to key.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
+ }
+ if (isMessageDelete) {
+ ineligibleMessages.add(message);
+ }
+ }
+ if (!ineligibleMessages.isEmpty()) {
+ deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
Review comment:
Is there a strict requirement to delete invalid messages right away
rather than deleting it in onCommit()?
We can simplify things. If processedMessages is not going to be used
anywhere but only for delete during onCommmit, I would vote to return all
messages in processed messages and keep this simple.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
+ }
+ if (isMessageDelete) {
+ ineligibleMessages.add(message);
+ }
+ }
+ if (!ineligibleMessages.isEmpty()) {
+ deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+ }
+
+ return eligibleRecords;
+ }
+
+ /**
+ * Get the list of events from queue.
+ *
+ * @param sparkContext JavaSparkContext to help parallelize certain
operations
+ * @param lastCheckpointStr the last checkpoint time string, empty if first
run
+ * @return the list of events
+ */
+ public Pair<List<String>, String> getNextEventsFromQueue(
+ AmazonSQS sqs,
+ JavaSparkContext sparkContext,
+ Option<String> lastCheckpointStr,
+ List<Message> processedMessages) {
+
+ processedMessages.clear();
+
+ log.info("Reading messages....");
+
+ try {
+ log.info("Start Checkpoint : " + lastCheckpointStr);
+
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+
+ List<Map<String, Object>> eligibleEventRecords = getEligibleEvents(sqs,
processedMessages);
+ log.info("eligible events size: " + eligibleEventRecords.size());
+
+ // sort all events by event time.
+ eligibleEventRecords.sort(
+ Comparator.comparingLong(
+ record ->
+ Date.from(
+ Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse(
+ (String) record.get("eventTime"))))
+ .getTime()));
+
+ List<String> filteredEventRecords = new ArrayList<>();
+ long newCheckpointTime = lastCheckpointTime;
+
+ for (Map<String, Object> eventRecord : eligibleEventRecords) {
+ newCheckpointTime =
+ Date.from(
+ Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse((String)
eventRecord.get("eventTime"))))
+ .getTime();
+
+ // Currently HUDI don't supports column names like request-amz-id-2
+ eventRecord.remove("responseElements");
+
+ filteredEventRecords.add(
+ new ObjectMapper().writeValueAsString(eventRecord).replace("%3D",
"="));
+ }
+ if (filteredEventRecords.isEmpty()) {
+ return new ImmutablePair<>(filteredEventRecords,
String.valueOf(newCheckpointTime));
+ }
+ return new ImmutablePair<>(filteredEventRecords,
String.valueOf(newCheckpointTime));
+ } catch (JSONException | IOException e) {
+ e.printStackTrace();
Review comment:
can we avoid printStackTrace please. HoodieException will take care of
it.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cloud Objects Hoodie Incr Source Class. {@link
CloudObjectsHoodieIncrSource}.This source will use
+ * the cloud files meta information form cloud meta hoodie table generate by
CloudObjectsMetaSource.
+ */
+public class CloudObjectsHoodieIncrSource extends HoodieIncrSource {
+
+ private static final Logger LOG =
LogManager.getLogger(CloudObjectsHoodieIncrSource.class);
+
+ public CloudObjectsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+ Option<String> lastCkptStr, long sourceLimit) {
+
+ DataSourceUtils.checkRequiredProperties(
+ props, Collections.singletonList(Config.HOODIE_SRC_BASE_PATH));
+
+ String srcPath = props.getString(Config.HOODIE_SRC_BASE_PATH);
+ int numInstantsPerFetch =
+ props.getInteger(Config.NUM_INSTANTS_PER_FETCH,
Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
+ boolean readLatestOnMissingCkpt =
+ props.getBoolean(
+ Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
+ Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+ // Use begin Instant if set and non-empty
+ Option<String> beginInstant =
+ lastCkptStr.isPresent()
+ ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
+ : Option.empty();
+
+ Pair<String, String> instantEndpts =
+ IncrSourceHelper.calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
readLatestOnMissingCkpt);
+
+ if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
+ LOG.warn("Already caught up. Begin Checkpoint was :" +
instantEndpts.getKey());
+ return Pair.of(Option.empty(), instantEndpts.getKey());
+ }
+
+ // Do Incr pull. Set end instant if available
+ DataFrameReader reader =
+ sparkSession
+ .read()
+ .format("org.apache.hudi")
+ .option(
+ DataSourceReadOptions.QUERY_TYPE().key(),
+ DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(
+ DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
instantEndpts.getLeft())
+ .option(
+ DataSourceReadOptions.END_INSTANTTIME().key(),
instantEndpts.getRight());
+
+ Dataset<Row> source = reader.load(srcPath);
+
+ // Extract distinct file keys from cloud meta hoodie table
+ final List<Row> cloudMetaDf =
+ source
+ .filter("s3.object.size > 0")
+ .select("s3.bucket.name", "s3.object.key")
+ .distinct()
+ .collectAsList();
+
+ // Create S3 paths
+ List<String> cloudFiles = new ArrayList<>();
+ for (Row row : cloudMetaDf) {
+ String bucket = row.getString(0);
+ String key = row.getString(1);
+ String filePath = "s3://" + bucket + "/" + key;
+ cloudFiles.add(filePath);
+ }
+ String pathStr = String.join(",", cloudFiles);
Review comment:
why do we join here and later split it from within fromFiles?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Cloud Objects Selector Class. This class has methods for processing cloud
objects. It currently
+ * supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+ public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+ Collections.singletonList("ObjectCreated");
+ public static volatile Logger log =
LogManager.getLogger(CloudObjectsSelector.class);
+ public final String queueUrl;
+ public final int longPollWait;
+ public final int maxMessagesEachRequest;
+ public final int maxMessageEachBatch;
+ public final int visibilityTimeout;
+ public final TypedProperties props;
+ public final String fsName;
+ private final String regionName;
+
+ /**
+ * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+ */
+ public CloudObjectsSelector(TypedProperties props) {
+ DataSourceUtils.checkRequiredProperties(props,
Arrays.asList(Config.QUEUE_URL_PROP, Config.QUEUE_REGION));
+ this.props = props;
+ this.queueUrl = props.getString(Config.QUEUE_URL_PROP);
+ this.regionName = props.getString(Config.QUEUE_REGION);
+ this.fsName = props.getString(Config.SOURCE_QUEUE_FS_PROP,
"s3").toLowerCase();
+ this.longPollWait = props.getInteger(Config.QUEUE_LONGPOLLWAIT_PROP, 20);
+ this.maxMessageEachBatch =
props.getInteger(Config.QUEUE_MAXMESSAGESEACHBATCH_PROP, 5);
+ this.visibilityTimeout =
props.getInteger(Config.QUEUE_VISIBILITYTIMEOUT_PROP, 30);
+ this.maxMessagesEachRequest = 10;
+ }
+
+ /**
+ * Get SQS queue attributes.
+ *
+ * @param sqsClient AWSClient for sqsClient
+ * @param queueUrl queue full url
+ * @return map of attributes needed
+ */
+ protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient,
String queueUrl) {
+ GetQueueAttributesResult queueAttributesResult =
+ sqsClient.getQueueAttributes(
+ new GetQueueAttributesRequest(queueUrl)
+ .withAttributeNames("ApproximateNumberOfMessages"));
+ return queueAttributesResult.getAttributes();
+ }
+
+ /**
+ * Get the file attributes filePath, eventTime and size from JSONObject
record.
+ *
+ * @param record of object event
+ * @return map of file attribute
+ */
+ protected Map<String, Object> getFileAttributesFromRecord(JSONObject record)
+ throws UnsupportedEncodingException {
+
+ Map<String, Object> fileRecord = new HashMap<>();
+ String eventTimeStr = record.getString("eventTime");
+ long eventTime =
+
Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+
+ JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+ String bucket =
+ URLDecoder.decode(
+
record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
+ String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+ String filePath = this.fsName + "://" + bucket + "/" + key;
+
+ fileRecord.put("eventTime", eventTime);
+ fileRecord.put("fileSize", s3Object.getLong("size"));
+ fileRecord.put("filePath", filePath);
+ return fileRecord;
+ }
+
+ /**
+ * Amazon SQS Client Builder.
+ */
+ public AmazonSQS createAmazonSqsClient() {
+ return
AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+ }
+
+ /**
+ * List messages from queue.
+ */
+ protected List<Message> getMessagesToProcess(
+ AmazonSQS sqsClient,
+ String queueUrl,
+ ReceiveMessageRequest receiveMessageRequest,
+ int maxMessageEachBatch,
+ int maxMessagesEachRequest) {
+ List<Message> messagesToProcess = new ArrayList<>();
+
+ // Get count for available messages
+ Map<String, String> queueAttributesResult =
getSqsQueueAttributes(sqsClient, queueUrl);
+ long approxMessagesAvailable =
+
Long.parseLong(queueAttributesResult.get("ApproximateNumberOfMessages"));
+ log.info("Approx. " + approxMessagesAvailable + " messages available in
queue.");
Review comment:
if we can set numMessagesToProcess = min(approxMessagesAvailable,
maxMessageEachBatch), we can avoid line 159 to 161.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
Review comment:
line 89 to 94 can be moved to getMessagesToProcess(...) itslef.
ReceiveMessageRequest is not really required in this method. it is used only
within getMessagesToProcess.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
Review comment:
Can we maintain same terminology everywhere. method is named as
"...Events", where as variables within are name d as eligible**Records**. Can
we have a uniform name to all these. either events, or records.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
+ }
+ if (isMessageDelete) {
+ ineligibleMessages.add(message);
+ }
+ }
+ if (!ineligibleMessages.isEmpty()) {
+ deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+ }
+
+ return eligibleRecords;
+ }
+
+ /**
+ * Get the list of events from queue.
+ *
+ * @param sparkContext JavaSparkContext to help parallelize certain
operations
+ * @param lastCheckpointStr the last checkpoint time string, empty if first
run
+ * @return the list of events
+ */
+ public Pair<List<String>, String> getNextEventsFromQueue(
+ AmazonSQS sqs,
+ JavaSparkContext sparkContext,
+ Option<String> lastCheckpointStr,
+ List<Message> processedMessages) {
+
+ processedMessages.clear();
+
+ log.info("Reading messages....");
+
+ try {
+ log.info("Start Checkpoint : " + lastCheckpointStr);
+
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+
+ List<Map<String, Object>> eligibleEventRecords = getEligibleEvents(sqs,
processedMessages);
+ log.info("eligible events size: " + eligibleEventRecords.size());
+
+ // sort all events by event time.
+ eligibleEventRecords.sort(
+ Comparator.comparingLong(
+ record ->
+ Date.from(
+ Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse(
+ (String) record.get("eventTime"))))
+ .getTime()));
+
+ List<String> filteredEventRecords = new ArrayList<>();
+ long newCheckpointTime = lastCheckpointTime;
+
+ for (Map<String, Object> eventRecord : eligibleEventRecords) {
+ newCheckpointTime =
+ Date.from(
+ Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse((String)
eventRecord.get("eventTime"))))
+ .getTime();
+
+ // Currently HUDI don't supports column names like request-amz-id-2
+ eventRecord.remove("responseElements");
Review comment:
lines 194 and 196, can't we do this within getEligibleEvents(). why do
manipulations at two places. can you help me understand.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
Review comment:
can we move this entire processing of messages to a method.
So, high level, getEligibleEvents(..) should be like this.
```
allMsgs = getAllMessages(...)
eligibleEvents = processAndDeleteInValidMessages(...)
return eligibleEvents
```
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
+ }
+ if (isMessageDelete) {
+ ineligibleMessages.add(message);
+ }
+ }
+ if (!ineligibleMessages.isEmpty()) {
+ deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+ }
+
+ return eligibleRecords;
+ }
+
+ /**
+ * Get the list of events from queue.
+ *
+ * @param sparkContext JavaSparkContext to help parallelize certain
operations
+ * @param lastCheckpointStr the last checkpoint time string, empty if first
run
+ * @return the list of events
+ */
+ public Pair<List<String>, String> getNextEventsFromQueue(
+ AmazonSQS sqs,
+ JavaSparkContext sparkContext,
+ Option<String> lastCheckpointStr,
+ List<Message> processedMessages) {
+
+ processedMessages.clear();
+
+ log.info("Reading messages....");
+
+ try {
+ log.info("Start Checkpoint : " + lastCheckpointStr);
+
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+
+ List<Map<String, Object>> eligibleEventRecords = getEligibleEvents(sqs,
processedMessages);
+ log.info("eligible events size: " + eligibleEventRecords.size());
+
+ // sort all events by event time.
+ eligibleEventRecords.sort(
+ Comparator.comparingLong(
+ record ->
+ Date.from(
+ Instant.from(
+ DateTimeFormatter.ISO_INSTANT.parse(
+ (String) record.get("eventTime"))))
+ .getTime()));
+
+ List<String> filteredEventRecords = new ArrayList<>();
+ long newCheckpointTime = lastCheckpointTime;
+
+ for (Map<String, Object> eventRecord : eligibleEventRecords) {
+ newCheckpointTime =
Review comment:
Is it possible to set the newCheckpoint outside the for loop. it should
refer to last msg right? Guess thats the reason why we sort the records earlier
is it?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to
process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+ /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+ public CloudObjectsMetaSelector(TypedProperties props) {
+ super(props);
+ }
+
+ /**
+ * Factory method for creating custom CloudObjectsMetaSelector. Default
selector to use is {@link
+ * CloudObjectsMetaSelector}
+ */
+ public static CloudObjectsMetaSelector createSourceSelector(TypedProperties
props) {
+ String sourceSelectorClass =
+ props.getString(
+ CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+ CloudObjectsMetaSelector.class.getName());
+ try {
+ CloudObjectsMetaSelector selector =
+ (CloudObjectsMetaSelector)
+ ReflectionUtils.loadClass(
+ sourceSelectorClass, new Class<?>[] {TypedProperties.class},
props);
+
+ log.info("Using path selector " + selector.getClass().getName());
+ return selector;
+ } catch (Exception e) {
+ throw new HoodieException("Could not load source selector class " +
sourceSelectorClass, e);
+ }
+ }
+
+ /**
+ * List messages from queue, filter out illegible events while doing so. It
will also delete the
+ * ineligible messages from queue.
+ *
+ * @param processedMessages array of processed messages to add more messages
+ * @return the list of eligible records
+ */
+ protected List<Map<String, Object>> getEligibleEvents(
+ AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+ List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+ List<Message> ineligibleMessages = new ArrayList<>();
+
+ ReceiveMessageRequest receiveMessageRequest =
+ new ReceiveMessageRequest()
+ .withQueueUrl(this.queueUrl)
+ .withWaitTimeSeconds(this.longPollWait)
+ .withVisibilityTimeout(this.visibilityTimeout);
+ receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+ List<Message> messages =
+ getMessagesToProcess(
+ sqs,
+ this.queueUrl,
+ receiveMessageRequest,
+ this.maxMessageEachBatch,
+ this.maxMessagesEachRequest);
+
+ for (Message message : messages) {
+ boolean isMessageDelete = Boolean.TRUE;
+
+ JSONObject messageBody = new JSONObject(message.getBody());
+ Map<String, Object> messageMap;
+ ObjectMapper mapper = new ObjectMapper();
+
+ if (messageBody.has("Message")) {
+ // If this messages is from S3Event -> SNS -> SQS
+ messageMap =
+ (Map<String, Object>)
mapper.readValue(messageBody.getString("Message"), Map.class);
+ } else {
+ // If this messages is from S3Event -> SQS
+ messageMap = (Map<String, Object>)
mapper.readValue(messageBody.toString(), Map.class);
+ }
+ if (messageMap.containsKey("Records")) {
+ List<Map<String, Object>> records = (List<Map<String, Object>>)
messageMap.get("Records");
+ for (Map<String, Object> record : records) {
+ String eventName = (String) record.get("eventName");
+
+ // filter only allowed s3 event types
+ if
(ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+ eligibleRecords.add(record);
+ isMessageDelete = Boolean.FALSE;
+ processedMessages.add(message);
+
+ } else {
+ log.info("This S3 event " + eventName + " is not allowed, so
ignoring it.");
+ }
+ }
+ } else {
+ log.info("Message is not expected format or it's s3:TestEvent");
Review comment:
1. debug.
2. do you think we need to add the msg value as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Implement DeltaStreamer Source for AWS S3
> -----------------------------------------
>
> Key: HUDI-1897
> URL: https://issues.apache.org/jira/browse/HUDI-1897
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: DeltaStreamer
> Reporter: Raymond Xu
> Priority: Critical
> Labels: pull-request-available
>
> Consider
> [https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html]
> andÂ
> https://docs.databricks.com/spark/latest/structured-streaming/sqs.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)