echauchot commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r789534496
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
Review comment:
please add a "." after BLOCKING for readability
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
+ List<KV<String, String>> messages =
+ mapWithIndex(receiptHandles.stream(), (handle, idx) ->
KV.of(Long.toString(idx), handle))
+ .collect(toList());
+
+ long nowMsSinceEpoch = now();
+ extendBatch(nowMsSinceEpoch, messages, 0);
+ numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+ }
+
+ /**
+ * BLOCKING Extend the SQS visibility timeout for messages in {@code
messages} as {@link KV} of
+ * message id, receipt handle.
*/
- void extendBatch(long nowMsSinceEpoch, List<String> receiptHandles, int
extensionSec)
+ void extendBatch(long nowMsSinceEpoch, List<KV<String, String>> messages,
int extensionSec)
Review comment:
Is extend that correct word as it does not add delay but rather sets the
timeout, no ?
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -509,70 +524,87 @@ void maybeCloseClient() throws IOException {
}
}
- /** delete the provided {@code messageIds} from SQS. */
+ /**
+ * Delete the provided {@code messageIds} from SQS in multiple batches. Each
batch except the last
+ * one is of size {@code DELETE_BATCH_SIZE}. Message ids that already got
removed from {@code
+ * inFlight} messages are ignored.
+ *
+ * <p>CAUTION: May be invoked from a separate thread.
+ */
void delete(List<String> messageIds) throws IOException {
- AtomicInteger counter = new AtomicInteger();
- for (List<String> messageList :
- messageIds.stream()
- .collect(groupingBy(x -> counter.getAndIncrement() /
DELETE_BATCH_SIZE))
- .values()) {
- deleteBatch(messageList);
+ ArrayList<String> receiptHandles = new ArrayList<>(DELETE_BATCH_SIZE);
+ for (String msgId : messageIds) {
+ InFlightState state = inFlight.get(msgId);
+ if (state == null) {
+ continue;
+ }
+ receiptHandles.add(state.receiptHandle);
+ if (receiptHandles.size() == DELETE_BATCH_SIZE) {
+ deleteBatch(receiptHandles);
+ receiptHandles.clear();
+ }
+ }
+ if (!receiptHandles.isEmpty()) {
+ deleteBatch(receiptHandles);
}
+ deletedIds.add(messageIds);
}
/**
- * delete the provided {@code messageIds} from SQS, blocking until all of
the messages are
- * deleted.
+ * Delete the provided {@code receiptHandles} from SQS. Blocking until all
messages are deleted.
*
* <p>CAUTION: May be invoked from a separate thread.
- *
- * <p>CAUTION: Retains {@code messageIds}.
*/
- private void deleteBatch(List<String> messageIds) throws IOException {
+ private void deleteBatch(List<String> receiptHandles) throws IOException {
int retries = 0;
- Map<String, String> pendingReceipts =
- IntStream.range(0, messageIds.size())
- .boxed()
- .filter(i -> inFlight.containsKey(messageIds.get(i)))
- .collect(toMap(Object::toString, i ->
inFlight.get(messageIds.get(i)).receiptHandle));
- while (!pendingReceipts.isEmpty()) {
+ FunctionWithIndex<String, DeleteMessageBatchRequestEntry> buildEntry =
+ (handle, id) ->
+ DeleteMessageBatchRequestEntry.builder()
+ .id(Long.toString(id))
+ .receiptHandle(handle)
+ .build();
+
+ Map<String, DeleteMessageBatchRequestEntry> pendingDeletes =
+ mapWithIndex(receiptHandles.stream(), buildEntry).collect(toMap(e ->
e.id(), identity()));
+
+ while (!pendingDeletes.isEmpty()) {
if (retries >= BATCH_OPERATION_MAX_RETIRES) {
throw new IOException(
- "Failed to extend visibility timeout for "
- + pendingReceipts.size()
+ "Failed to delete "
+ + pendingDeletes.size()
+ " messages after "
+ retries
+ " retries");
}
- List<DeleteMessageBatchRequestEntry> entries =
- pendingReceipts.entrySet().stream()
- .map(
- r ->
- DeleteMessageBatchRequestEntry.builder()
- .id(r.getKey())
- .receiptHandle(r.getValue())
- .build())
- .collect(Collectors.toList());
-
DeleteMessageBatchResponse result =
sqsClient.deleteMessageBatch(
DeleteMessageBatchRequest.builder()
- .queueUrl(source.getRead().queueUrl())
- .entries(entries)
+ .queueUrl(queueUrl())
+ .entries(pendingDeletes.values())
.build());
- // Reflect failed message IDs to map
- pendingReceipts
- .keySet()
- .retainAll(
-
result.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet()));
+ Map<Boolean, Set<String>> failures =
+ result.failed().stream()
+ .collect(partitioningBy(this::isHandleInvalid, mapping(e ->
e.id(), toSet())));
+
+ // Keep failed IDs only, but discard invalid receipt handles
+ pendingDeletes.keySet().retainAll(failures.getOrDefault(FALSE,
ImmutableSet.of()));
Review comment:
messages corresponding to expired handles do not need to be deleted
because they are no longer part of the SQS broker ? Right ?
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
+
+
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
Review comment:
better, clearer
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
+
+
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+ when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
}
@Test
public void testReadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
// Read one message.
assertTrue(reader.start());
assertEquals(DATA, reader.getCurrent().getBody());
assertFalse(reader.advance());
+
// ACK the message.
UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
- public void testTimeoutAckAndRereadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
- SqsClient sqsClient = embeddedSqsRestServer.getClient();
+ public void testAckDeletedMessage() throws IOException {
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+ // Read one message
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent().getBody());
+ String receiptHandle = reader.getCurrent().getReceiptHandle();
+ assertFalse(reader.advance());
+
+ // Simulate already ACKed message after re-delivery to different reader
+ testCase
+ .getClient()
+ .deleteMessage(b ->
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+ // Now ACK the message.
+ UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ @Ignore("Behavior of SQSRestServer is broken")
Review comment:
why is this broken ? why adding a test that is already broken ? What
lacks for fixing ?
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
+
+
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+ when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
}
@Test
public void testReadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
// Read one message.
assertTrue(reader.start());
assertEquals(DATA, reader.getCurrent().getBody());
assertFalse(reader.advance());
+
// ACK the message.
UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
- public void testTimeoutAckAndRereadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
- SqsClient sqsClient = embeddedSqsRestServer.getClient();
+ public void testAckDeletedMessage() throws IOException {
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+ // Read one message
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent().getBody());
+ String receiptHandle = reader.getCurrent().getReceiptHandle();
+ assertFalse(reader.advance());
+
+ // Simulate already ACKed message after re-delivery to different reader
+ testCase
+ .getClient()
+ .deleteMessage(b ->
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+ // Now ACK the message.
+ UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ @Ignore("Behavior of SQSRestServer is broken")
+ public void testExtendDeletedMessage() throws IOException {
+ setupMessages(DATA);
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).thenReturn(currentTimeMillis());
+
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null,
clock);
+
+ // Read one message
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent().getBody());
+
+ // Simulate already ACKed message after re-delivery to different reader
+ String receiptHandle = reader.getCurrent().getReceiptHandle();
+ testCase
+ .getClient()
+ .deleteMessage(b ->
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+ // Forward time to force extension of visibility
+ when(clock.millis()).thenReturn(currentTimeMillis() +
reader.getVisibilityTimeoutMs() * 8 / 10);
+
+ // Advancing the reader will attempt extending the visibility
+ assertFalse(reader.advance());
Review comment:
should be false because `reader.start()` has read the first message and
it should not have been resent because invalidated ? Right ? If so, please
update the comment to explain more
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
Review comment:
thx for improving the clarity
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -509,70 +524,87 @@ void maybeCloseClient() throws IOException {
}
}
- /** delete the provided {@code messageIds} from SQS. */
+ /**
+ * Delete the provided {@code messageIds} from SQS in multiple batches. Each
batch except the last
Review comment:
Thanks for adding these precisions
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
+ List<KV<String, String>> messages =
+ mapWithIndex(receiptHandles.stream(), (handle, idx) ->
KV.of(Long.toString(idx), handle))
+ .collect(toList());
+
+ long nowMsSinceEpoch = now();
+ extendBatch(nowMsSinceEpoch, messages, 0);
+ numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+ }
+
+ /**
+ * BLOCKING Extend the SQS visibility timeout for messages in {@code
messages} as {@link KV} of
+ * message id, receipt handle.
*/
- void extendBatch(long nowMsSinceEpoch, List<String> receiptHandles, int
extensionSec)
+ void extendBatch(long nowMsSinceEpoch, List<KV<String, String>> messages,
int extensionSec)
throws IOException {
int retries = 0;
- int numMessages = receiptHandles.size();
- Map<String, String> pendingReceipts =
- IntStream.range(0, receiptHandles.size())
- .boxed()
- .collect(toMap(Object::toString, receiptHandles::get));
- while (!pendingReceipts.isEmpty()) {
+ Function<KV<String, String>, ChangeMessageVisibilityBatchRequestEntry>
buildEntry =
+ kv ->
+ ChangeMessageVisibilityBatchRequestEntry.builder()
+ .visibilityTimeout(extensionSec)
+ .id(kv.getKey())
+ .receiptHandle(kv.getValue())
+ .build();
+
+ Map<String, ChangeMessageVisibilityBatchRequestEntry> pendingExtends =
+ messages.stream().collect(toMap(KV::getKey, buildEntry));
+
+ while (!pendingExtends.isEmpty()) {
if (retries >= BATCH_OPERATION_MAX_RETIRES) {
throw new IOException(
"Failed to extend visibility timeout for "
- + receiptHandles.size()
+ + messages.size()
+ " messages after "
+ retries
+ " retries");
}
- List<ChangeMessageVisibilityBatchRequestEntry> entries =
- pendingReceipts.entrySet().stream()
- .map(
- r ->
- ChangeMessageVisibilityBatchRequestEntry.builder()
- .id(r.getKey())
- .receiptHandle(r.getValue())
- .visibilityTimeout(extensionSec)
- .build())
- .collect(Collectors.toList());
-
ChangeMessageVisibilityBatchResponse response =
sqsClient.changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest.builder()
- .queueUrl(source.getRead().queueUrl())
- .entries(entries)
+ .queueUrl(queueUrl())
+ .entries(pendingExtends.values())
.build());
- pendingReceipts
- .keySet()
- .retainAll(
- response.failed().stream()
- .map(BatchResultErrorEntry::id)
- .collect(Collectors.toSet()));
+ Map<Boolean, Set<String>> failures =
+ response.failed().stream()
+ .collect(partitioningBy(this::isHandleInvalid, mapping(e ->
e.id(), toSet())));
+
+ // Keep failed IDs only, but discard invalid (expired) receipt handles
+ pendingExtends.keySet().retainAll(failures.getOrDefault(FALSE,
ImmutableSet.of()));
+
+ // Skip stats if explicitly expiring messages for immediate redelivery
+ if (extensionSec > 0) {
+ numExtendedDeadlines.add(nowMsSinceEpoch,
response.successful().size());
+
+ Set<String> invalidMsgIds = failures.getOrDefault(TRUE,
ImmutableSet.of());
+ if (invalidMsgIds.size() > 0) {
+ // consider invalid (expired) messages no longer in flight
+ numLateDeadlines.add(nowMsSinceEpoch, invalidMsgIds.size());
+ for (String msgId : invalidMsgIds) {
+ inFlight.remove(msgId);
+ }
+ LOG.warn(
+ "Failed to extend visibility timeout for {} messages due to
expired receipt handles.",
Review comment:
please rephrase "Failed to extend visibility timeout for {} messages
with expired receipt handles."
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
Review comment:
Good for tests checks but maybe we should consider using seven
`MockitoJUnitRunner.StrictStubs` variant of this runner
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java
##########
@@ -56,7 +57,9 @@
@VisibleForTesting final List<String> notYetReadReceipts;
Review comment:
Can you also update the javadoc? Main sentence lacks a verb
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
+ List<KV<String, String>> messages =
+ mapWithIndex(receiptHandles.stream(), (handle, idx) ->
KV.of(Long.toString(idx), handle))
+ .collect(toList());
+
+ long nowMsSinceEpoch = now();
+ extendBatch(nowMsSinceEpoch, messages, 0);
+ numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+ }
+
+ /**
+ * BLOCKING Extend the SQS visibility timeout for messages in {@code
messages} as {@link KV} of
+ * message id, receipt handle.
*/
- void extendBatch(long nowMsSinceEpoch, List<String> receiptHandles, int
extensionSec)
+ void extendBatch(long nowMsSinceEpoch, List<KV<String, String>> messages,
int extensionSec)
throws IOException {
int retries = 0;
- int numMessages = receiptHandles.size();
- Map<String, String> pendingReceipts =
- IntStream.range(0, receiptHandles.size())
- .boxed()
- .collect(toMap(Object::toString, receiptHandles::get));
- while (!pendingReceipts.isEmpty()) {
+ Function<KV<String, String>, ChangeMessageVisibilityBatchRequestEntry>
buildEntry =
+ kv ->
+ ChangeMessageVisibilityBatchRequestEntry.builder()
+ .visibilityTimeout(extensionSec)
+ .id(kv.getKey())
+ .receiptHandle(kv.getValue())
+ .build();
+
+ Map<String, ChangeMessageVisibilityBatchRequestEntry> pendingExtends =
+ messages.stream().collect(toMap(KV::getKey, buildEntry));
+
+ while (!pendingExtends.isEmpty()) {
if (retries >= BATCH_OPERATION_MAX_RETIRES) {
throw new IOException(
"Failed to extend visibility timeout for "
- + receiptHandles.size()
+ + messages.size()
+ " messages after "
+ retries
+ " retries");
}
- List<ChangeMessageVisibilityBatchRequestEntry> entries =
- pendingReceipts.entrySet().stream()
- .map(
- r ->
- ChangeMessageVisibilityBatchRequestEntry.builder()
- .id(r.getKey())
- .receiptHandle(r.getValue())
- .visibilityTimeout(extensionSec)
- .build())
- .collect(Collectors.toList());
-
ChangeMessageVisibilityBatchResponse response =
sqsClient.changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest.builder()
- .queueUrl(source.getRead().queueUrl())
- .entries(entries)
+ .queueUrl(queueUrl())
+ .entries(pendingExtends.values())
.build());
- pendingReceipts
- .keySet()
- .retainAll(
- response.failed().stream()
- .map(BatchResultErrorEntry::id)
- .collect(Collectors.toSet()));
+ Map<Boolean, Set<String>> failures =
+ response.failed().stream()
+ .collect(partitioningBy(this::isHandleInvalid, mapping(e ->
e.id(), toSet())));
+
+ // Keep failed IDs only, but discard invalid (expired) receipt handles
+ pendingExtends.keySet().retainAll(failures.getOrDefault(FALSE,
ImmutableSet.of()));
+
+ // Skip stats if explicitly expiring messages for immediate redelivery
Review comment:
sed s/stats/stats update and inFlight management/g
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
+ List<KV<String, String>> messages =
+ mapWithIndex(receiptHandles.stream(), (handle, idx) ->
KV.of(Long.toString(idx), handle))
+ .collect(toList());
+
+ long nowMsSinceEpoch = now();
+ extendBatch(nowMsSinceEpoch, messages, 0);
+ numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+ }
+
+ /**
+ * BLOCKING Extend the SQS visibility timeout for messages in {@code
messages} as {@link KV} of
Review comment:
ditto
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
+
+
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+ when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
}
@Test
public void testReadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
// Read one message.
assertTrue(reader.start());
assertEquals(DATA, reader.getCurrent().getBody());
assertFalse(reader.advance());
+
// ACK the message.
UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
- public void testTimeoutAckAndRereadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
- SqsClient sqsClient = embeddedSqsRestServer.getClient();
+ public void testAckDeletedMessage() throws IOException {
Review comment:
I need a confirmation: this test is the test that verifies the PR
change? You simulate an invalid (expired receiptHandle) by deleting the
message. Then you expect the checkpoint finalization to not throwing an
exception ?
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
+
+
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+ when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
}
@Test
public void testReadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
// Read one message.
assertTrue(reader.start());
assertEquals(DATA, reader.getCurrent().getBody());
assertFalse(reader.advance());
+
// ACK the message.
UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
- public void testTimeoutAckAndRereadOneMessage() throws IOException {
- setupOneMessage();
- UnboundedSource.UnboundedReader<SqsMessage> reader =
- source.createReader(pipeline.getOptions(), null);
- SqsClient sqsClient = embeddedSqsRestServer.getClient();
+ public void testAckDeletedMessage() throws IOException {
+ setupMessages(DATA);
+ SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+ // Read one message
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent().getBody());
+ String receiptHandle = reader.getCurrent().getReceiptHandle();
+ assertFalse(reader.advance());
Review comment:
yes, better to assert that it is the last and single message, indeed
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOTest.java
##########
@@ -35,19 +37,18 @@
/** Tests on {@link SqsIO}. */
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public class SqsIOTest {
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
Review comment:
no kidding, before this change, the server was created with each test ? !
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
+ final SqsClient client = testCase.getClient();
+ final String queueUrl = testCase.getQueueUrl();
for (String message : messages) {
- client.sendMessage(
-
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+ client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
}
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(messages.size()));
Review comment:
why no more set `maxNumRecords` ?
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
public class SqsUnboundedReaderTest {
private static final String DATA = "testData";
- @Rule public TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static EmbeddedSqsServer sqsServer = new
EmbeddedSqsServer();
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
+ @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
- private SqsUnboundedSource source;
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ public SqsUnboundedSource mockSource;
- private void setupOneMessage() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
- }
-
- private void setupMessages(List<String> messages) {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+ private void setupMessages(String... messages) {
Review comment:
thanks for removing the big copy/paste :+1:
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/EmbeddedSqsServer.java
##########
@@ -25,44 +27,57 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
-import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
class EmbeddedSqsServer extends ExternalResource {
- private static SQSRestServer sqsRestServer;
- private static SqsClient client;
- private static String queueUrl;
- private static String queueName = "test";
+ private SQSRestServer sqsRestServer;
+ private URI endpoint;
@Override
protected void before() {
sqsRestServer = SQSRestServerBuilder.withDynamicPort().start();
int port = sqsRestServer.waitUntilStarted().localAddress().getPort();
- client =
- SqsClient.builder()
- .credentialsProvider(
-
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
- .endpointOverride(URI.create(String.format("http://localhost:%d",
port)))
- .region(Region.US_WEST_2)
- .build();
-
- CreateQueueRequest createQueueRequest =
- CreateQueueRequest.builder().queueName(queueName).build();
- final CreateQueueResponse queue = client.createQueue(createQueueRequest);
- queueUrl = queue.queueUrl();
- }
-
- public SqsClient getClient() {
- return client;
- }
-
- public String getQueueUrl() {
- return queueUrl;
+ endpoint = URI.create(String.format("http://localhost:%d", port));
}
@Override
protected void after() {
sqsRestServer.stopAndWait();
- client.close();
+ }
+
+ /** Isolated environment (queue, client) per test case. */
Review comment:
yes, good point ! thanks !
It is indeed needed as tests can run in parallel. Some test also use the
thread id to randomize resource names but your randomization method looks good
to me
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSourceTest.java
##########
@@ -17,36 +17,20 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static org.mockito.Mockito.mock;
+
import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedSource}. */
@RunWith(JUnit4.class)
public class SqsUnboundedSourceTest {
- private static final String DATA = "testData";
-
- @Rule public TestPipeline pipeline = TestPipeline.create();
-
- @Rule public EmbeddedSqsServer embeddedSqsRestServer = new
EmbeddedSqsServer();
-
@Test
public void testCheckpointCoderIsSane() {
- final SqsClient client = embeddedSqsRestServer.getClient();
- final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
- SqsUnboundedSource source =
- new SqsUnboundedSource(
- SqsIO.read()
- .withQueueUrl(queueUrl)
- .withSqsClientProvider(SqsClientProviderMock.of(client))
- .withMaxNumRecords(1));
+ SqsUnboundedSource source = new SqsUnboundedSource(mock(SqsIO.Read.class));
Review comment:
Thanks for this simplification ! Indeed, only needed to test at the
source level, not need for the higher level stuff (server, pipeline, sending a
message)
##########
File path:
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSourceTest.java
##########
@@ -17,36 +17,20 @@
*/
package org.apache.beam.sdk.io.aws2.sqs;
+import static org.mockito.Mockito.mock;
+
import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
/** Tests on {@link SqsUnboundedSource}. */
@RunWith(JUnit4.class)
Review comment:
Use mockito junit runner ?
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
Review comment:
Out of curiosity, is setting the timeout at 0 is enough for retrial of
the message (restoring checkpoint), which service does the actual retrial ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]