mosche commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791410860



##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOTest.java
##########
@@ -35,19 +37,18 @@
 
 /** Tests on {@link SqsIO}. */
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public class SqsIOTest {
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();

Review comment:
       Sadly, yes

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));

Review comment:
       this is only meant to test the reader, the whole source is mocked now as 
it just distracts from what's tested and only complicates things. all the 
unnecessary stuff is gone now ...

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));
+
+    
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+    when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
   }
 
   @Test
   public void testReadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent().getBody());
     assertFalse(reader.advance());
+
     // ACK the message.
     UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
 
   @Test
-  public void testTimeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
-    SqsClient sqsClient = embeddedSqsRestServer.getClient();
+  public void testAckDeletedMessage() throws IOException {

Review comment:
       Exactly 👍 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));
+
+    
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+    when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
   }
 
   @Test
   public void testReadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent().getBody());
     assertFalse(reader.advance());
+
     // ACK the message.
     UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
 
   @Test
-  public void testTimeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
-    SqsClient sqsClient = embeddedSqsRestServer.getClient();
+  public void testAckDeletedMessage() throws IOException {
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+    // Read one message
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent().getBody());
+    String receiptHandle = reader.getCurrent().getReceiptHandle();
+    assertFalse(reader.advance());
+
+    // Simulate already ACKed message after re-delivery to different reader
+    testCase
+        .getClient()
+        .deleteMessage(b -> 
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+    // Now ACK the message.
+    UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  @Ignore("Behavior of SQSRestServer is broken")

Review comment:
       The batch API of elasticmq used for testing here isn't behaving 
correctly in this case :/
   
   It's supposed to test the same fix for changing visibility timeouts as 
tested above for deleting messages.
   When I find time I'm intending to fix the issue in elasticmq. For now I've 
left the test case primarily for documentation.

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));
+
+    
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+    when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
   }
 
   @Test
   public void testReadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent().getBody());
     assertFalse(reader.advance());
+
     // ACK the message.
     UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
 
   @Test
-  public void testTimeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
-    SqsClient sqsClient = embeddedSqsRestServer.getClient();
+  public void testAckDeletedMessage() throws IOException {
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+    // Read one message
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent().getBody());
+    String receiptHandle = reader.getCurrent().getReceiptHandle();
+    assertFalse(reader.advance());
+
+    // Simulate already ACKed message after re-delivery to different reader
+    testCase
+        .getClient()
+        .deleteMessage(b -> 
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+    // Now ACK the message.
+    UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  @Ignore("Behavior of SQSRestServer is broken")
+  public void testExtendDeletedMessage() throws IOException {
+    setupMessages(DATA);
+    Clock clock = mock(Clock.class);
+    when(clock.millis()).thenReturn(currentTimeMillis());
+
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null, 
clock);
+
+    // Read one message
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent().getBody());
+
+    // Simulate already ACKed message after re-delivery to different reader
+    String receiptHandle = reader.getCurrent().getReceiptHandle();
+    testCase
+        .getClient()
+        .deleteMessage(b -> 
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+    // Forward time to force extension of visibility
+    when(clock.millis()).thenReturn(currentTimeMillis() + 
reader.getVisibilityTimeoutMs() * 8 / 10);
+
+    // Advancing the reader will attempt extending the visibility
+    assertFalse(reader.advance());

Review comment:
       👍 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSourceTest.java
##########
@@ -17,36 +17,20 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static org.mockito.Mockito.mock;
+
 import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedSource}. */
 @RunWith(JUnit4.class)

Review comment:
       The test isn't using any `@Mock` rule, so no need for the Mockito runner

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -509,70 +524,87 @@ void maybeCloseClient() throws IOException {
     }
   }
 
-  /** delete the provided {@code messageIds} from SQS. */
+  /**
+   * Delete the provided {@code messageIds} from SQS in multiple batches. Each 
batch except the last
+   * one is of size {@code DELETE_BATCH_SIZE}. Message ids that already got 
removed from {@code
+   * inFlight} messages are ignored.
+   *
+   * <p>CAUTION: May be invoked from a separate thread.
+   */
   void delete(List<String> messageIds) throws IOException {
-    AtomicInteger counter = new AtomicInteger();
-    for (List<String> messageList :
-        messageIds.stream()
-            .collect(groupingBy(x -> counter.getAndIncrement() / 
DELETE_BATCH_SIZE))
-            .values()) {
-      deleteBatch(messageList);
+    ArrayList<String> receiptHandles = new ArrayList<>(DELETE_BATCH_SIZE);
+    for (String msgId : messageIds) {
+      InFlightState state = inFlight.get(msgId);
+      if (state == null) {
+        continue;
+      }
+      receiptHandles.add(state.receiptHandle);
+      if (receiptHandles.size() == DELETE_BATCH_SIZE) {
+        deleteBatch(receiptHandles);
+        receiptHandles.clear();
+      }
+    }
+    if (!receiptHandles.isEmpty()) {
+      deleteBatch(receiptHandles);
     }
+    deletedIds.add(messageIds);
   }
 
   /**
-   * delete the provided {@code messageIds} from SQS, blocking until all of 
the messages are
-   * deleted.
+   * Delete the provided {@code receiptHandles} from SQS. Blocking until all 
messages are deleted.
    *
    * <p>CAUTION: May be invoked from a separate thread.
-   *
-   * <p>CAUTION: Retains {@code messageIds}.
    */
-  private void deleteBatch(List<String> messageIds) throws IOException {
+  private void deleteBatch(List<String> receiptHandles) throws IOException {
     int retries = 0;
-    Map<String, String> pendingReceipts =
-        IntStream.range(0, messageIds.size())
-            .boxed()
-            .filter(i -> inFlight.containsKey(messageIds.get(i)))
-            .collect(toMap(Object::toString, i -> 
inFlight.get(messageIds.get(i)).receiptHandle));
 
-    while (!pendingReceipts.isEmpty()) {
+    FunctionWithIndex<String, DeleteMessageBatchRequestEntry> buildEntry =
+        (handle, id) ->
+            DeleteMessageBatchRequestEntry.builder()
+                .id(Long.toString(id))
+                .receiptHandle(handle)
+                .build();
+
+    Map<String, DeleteMessageBatchRequestEntry> pendingDeletes =
+        mapWithIndex(receiptHandles.stream(), buildEntry).collect(toMap(e -> 
e.id(), identity()));
+
+    while (!pendingDeletes.isEmpty()) {
 
       if (retries >= BATCH_OPERATION_MAX_RETIRES) {
         throw new IOException(
-            "Failed to extend visibility timeout for "
-                + pendingReceipts.size()
+            "Failed to delete "
+                + pendingDeletes.size()
                 + " messages after "
                 + retries
                 + " retries");
       }
 
-      List<DeleteMessageBatchRequestEntry> entries =
-          pendingReceipts.entrySet().stream()
-              .map(
-                  r ->
-                      DeleteMessageBatchRequestEntry.builder()
-                          .id(r.getKey())
-                          .receiptHandle(r.getValue())
-                          .build())
-              .collect(Collectors.toList());
-
       DeleteMessageBatchResponse result =
           sqsClient.deleteMessageBatch(
               DeleteMessageBatchRequest.builder()
-                  .queueUrl(source.getRead().queueUrl())
-                  .entries(entries)
+                  .queueUrl(queueUrl())
+                  .entries(pendingDeletes.values())
                   .build());
 
-      // Reflect failed message IDs to map
-      pendingReceipts
-          .keySet()
-          .retainAll(
-              
result.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet()));
+      Map<Boolean, Set<String>> failures =
+          result.failed().stream()
+              .collect(partitioningBy(this::isHandleInvalid, mapping(e -> 
e.id(), toSet())));
+
+      // Keep failed IDs only, but discard invalid receipt handles
+      pendingDeletes.keySet().retainAll(failures.getOrDefault(FALSE, 
ImmutableSet.of()));

Review comment:
       exactly, the might have been redelivered to & deleted by another reader 
in the meanwhile ...

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
         // We'll try to track that on our side, but note the deadlines won't 
necessarily agree.
         long extensionMs = (int) ((visibilityTimeoutMs * 
VISIBILITY_EXTENSION_PCT) / 100L);
         long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+        List<KV<String, String>> messages = new 
ArrayList<>(toBeExtended.size());
+
         for (String messageId : toBeExtended) {
           // Maintain increasing ack deadline order.
-          String receiptHandle = inFlight.get(messageId).receiptHandle;
           InFlightState state = inFlight.remove(messageId);
-          inFlight.put(
-              messageId,
-              new InFlightState(
-                  receiptHandle, state.requestTimeMsSinceEpoch, 
newDeadlineMsSinceEpoch));
+          state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+          inFlight.put(messageId, state);
+          messages.add(KV.of(messageId, state.receiptHandle));
         }
-        List<String> receiptHandles =
-            toBeExtended.stream()
-                .map(inFlight::get)
-                .filter(Objects::nonNull) // get rid of null values
-                .map(m -> m.receiptHandle)
-                .collect(Collectors.toList());
+
         // BLOCKs until extended.
-        extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs / 
1000));
+        extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
       }
     }
   }
 
   /**
-   * BLOCKING Extend the visibility timeout for messages from SQS with the 
given {@code
-   * receiptHandles}.
+   * BLOCKING Set the SQS visibility timeout for messages in {@code 
receiptHandles} to zero for
+   * immediate redelivery.
+   */
+  void expireBatchForRedelivery(List<String> receiptHandles) throws 
IOException {

Review comment:
       The visibility timeout defines the point in time when an SQS message is 
available for re-delivery again unless it was deleted or extended in the 
meanwhile. These messages will be randomly delivered to any reader that polls 
for messages next. 

##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
         // We'll try to track that on our side, but note the deadlines won't 
necessarily agree.
         long extensionMs = (int) ((visibilityTimeoutMs * 
VISIBILITY_EXTENSION_PCT) / 100L);
         long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+        List<KV<String, String>> messages = new 
ArrayList<>(toBeExtended.size());
+
         for (String messageId : toBeExtended) {
           // Maintain increasing ack deadline order.
-          String receiptHandle = inFlight.get(messageId).receiptHandle;
           InFlightState state = inFlight.remove(messageId);
-          inFlight.put(
-              messageId,
-              new InFlightState(
-                  receiptHandle, state.requestTimeMsSinceEpoch, 
newDeadlineMsSinceEpoch));
+          state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+          inFlight.put(messageId, state);
+          messages.add(KV.of(messageId, state.receiptHandle));
         }
-        List<String> receiptHandles =
-            toBeExtended.stream()
-                .map(inFlight::get)
-                .filter(Objects::nonNull) // get rid of null values
-                .map(m -> m.receiptHandle)
-                .collect(Collectors.toList());
+
         // BLOCKs until extended.
-        extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs / 
1000));
+        extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
       }
     }
   }
 
   /**
-   * BLOCKING Extend the visibility timeout for messages from SQS with the 
given {@code
-   * receiptHandles}.
+   * BLOCKING Set the SQS visibility timeout for messages in {@code 
receiptHandles} to zero for
+   * immediate redelivery.
+   */
+  void expireBatchForRedelivery(List<String> receiptHandles) throws 
IOException {
+    List<KV<String, String>> messages =
+        mapWithIndex(receiptHandles.stream(), (handle, idx) -> 
KV.of(Long.toString(idx), handle))
+            .collect(toList());
+
+    long nowMsSinceEpoch = now();
+    extendBatch(nowMsSinceEpoch, messages, 0);
+    numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+  }
+
+  /**
+   * BLOCKING Extend the SQS visibility timeout for messages in {@code 
messages} as {@link KV} of
+   * message id, receipt handle.
    */
-  void extendBatch(long nowMsSinceEpoch, List<String> receiptHandles, int 
extensionSec)
+  void extendBatch(long nowMsSinceEpoch, List<KV<String, String>> messages, 
int extensionSec)

Review comment:
       The way it works is that the visibility timeout is added onto the 
current time (by SQS) to update the point in time when a message becomes 
available for redelivery. It's not a general timeout, but really an extension 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)

Review comment:
       already replaced 👍 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));
+
+    
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+    when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
   }
 
   @Test
   public void testReadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent().getBody());
     assertFalse(reader.advance());
+
     // ACK the message.
     UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
 
   @Test
-  public void testTimeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
-    SqsClient sqsClient = embeddedSqsRestServer.getClient();
+  public void testAckDeletedMessage() throws IOException {
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
+    // Read one message
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent().getBody());
+    String receiptHandle = reader.getCurrent().getReceiptHandle();
+    assertFalse(reader.advance());
+
+    // Simulate already ACKed message after re-delivery to different reader
+    testCase
+        .getClient()
+        .deleteMessage(b -> 
b.queueUrl(testCase.getQueueUrl()).receiptHandle(receiptHandle));
+
+    // Now ACK the message.
+    UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  @Ignore("Behavior of SQSRestServer is broken")

Review comment:
       done 👍 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSourceTest.java
##########
@@ -17,36 +17,20 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static org.mockito.Mockito.mock;
+
 import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedSource}. */
 @RunWith(JUnit4.class)

Review comment:
       👍 

##########
File path: 
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java
##########
@@ -17,113 +17,155 @@
  */
 package org.apache.beam.sdk.io.aws2.sqs;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.IntStream.range;
 import static junit.framework.TestCase.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 
 /** Tests on {@link SqsUnboundedReader}. */
-@RunWith(JUnit4.class)
+@RunWith(MockitoJUnitRunner.class)
 public class SqsUnboundedReaderTest {
   private static final String DATA = "testData";
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @ClassRule public static EmbeddedSqsServer sqsServer = new 
EmbeddedSqsServer();
 
-  @Rule public EmbeddedSqsServer embeddedSqsRestServer = new 
EmbeddedSqsServer();
+  @Rule public TestCaseEnv testCase = new TestCaseEnv(sqsServer);
 
-  private SqsUnboundedSource source;
+  @Mock(answer = RETURNS_DEEP_STUBS)
+  public SqsUnboundedSource mockSource;
 
-  private void setupOneMessage() {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
-    
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(1));
-  }
-
-  private void setupMessages(List<String> messages) {
-    final SqsClient client = embeddedSqsRestServer.getClient();
-    final String queueUrl = embeddedSqsRestServer.getQueueUrl();
+  private void setupMessages(String... messages) {
+    final SqsClient client = testCase.getClient();
+    final String queueUrl = testCase.getQueueUrl();
     for (String message : messages) {
-      client.sendMessage(
-          
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
+      client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
     }
-    source =
-        new SqsUnboundedSource(
-            SqsIO.read()
-                .withQueueUrl(queueUrl)
-                .withSqsClientProvider(SqsClientProviderMock.of(client))
-                .withMaxNumRecords(messages.size()));
+
+    
when(mockSource.getRead().sqsClientProvider()).thenReturn(SqsClientProviderMock.of(client));
+    when(mockSource.getRead().queueUrl()).thenReturn(queueUrl);
   }
 
   @Test
   public void testReadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
+    setupMessages(DATA);
+    SqsUnboundedReader reader = new SqsUnboundedReader(mockSource, null);
+
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent().getBody());
     assertFalse(reader.advance());
+
     // ACK the message.
     UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
     checkpoint.finalizeCheckpoint();
     reader.close();
   }
 
   @Test
-  public void testTimeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    UnboundedSource.UnboundedReader<SqsMessage> reader =
-        source.createReader(pipeline.getOptions(), null);
-    SqsClient sqsClient = embeddedSqsRestServer.getClient();
+  public void testAckDeletedMessage() throws IOException {

Review comment:
       Done 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to