This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f486ff8efe1233bb6c021e1eee441a317c9875f
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Aug 25 15:54:54 2023 +0200

    [hotfix][test] Refactors test to remove version and offset server-side 
checks
    
    The version and offset check is not really necessary for the server-side 
processing because it's
    verifying test code (the ServerThread handles the forwarding of the version 
and the setting of the
    offset). Therefore, we can also remove those checks.
---
 .../CollectSinkOperatorCoordinatorTest.java        | 102 ++++++++++++++-------
 1 file changed, 69 insertions(+), 33 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index d11e5712cdf..f6948dd9659 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
 import org.apache.flink.types.Row;
@@ -57,10 +58,11 @@ public class CollectSinkOperatorCoordinatorTest {
                 new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
         coordinator.start();
 
-        CollectCoordinationRequest request = new 
CollectCoordinationRequest("version", 123);
-        CollectCoordinationResponse response =
-                (CollectCoordinationResponse) 
coordinator.handleCoordinationRequest(request).get();
-        assertResponseEquals(request, response, -1, Collections.emptyList());
+        final String expectedVersion = "version";
+        final CompletableFuture<CoordinationResponse> response =
+                coordinator.handleCoordinationRequest(
+                        
createRequestForClientGeneratedResponse(expectedVersion));
+        assertEmptyResponseGeneratedFromClient(response, expectedVersion);
 
         coordinator.close();
     }
@@ -81,21 +83,19 @@ public class CollectSinkOperatorCoordinatorTest {
                 0, 0, new CollectSinkAddressEvent(server.getServerAddress()));
 
         // a normal response
-        CollectCoordinationRequest request = new 
CollectCoordinationRequest("version1", 123);
-        CollectCoordinationResponse response =
-                (CollectCoordinationResponse) 
coordinator.handleCoordinationRequest(request).get();
-        assertResponseEquals(request, response, 0, expected.get(0));
+        CompletableFuture<CoordinationResponse> response =
+                
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
+        assertResponseWithDefaultMetadataFromServer(response, expected.get(0));
 
         // a normal response
-        request = new CollectCoordinationRequest("version2", 456);
-        response =
-                (CollectCoordinationResponse) 
coordinator.handleCoordinationRequest(request).get();
-        assertResponseEquals(request, response, 0, expected.get(1));
+        response = 
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
+        assertResponseWithDefaultMetadataFromServer(response, expected.get(1));
 
         // server closes here
-        request = new CollectCoordinationRequest("version3", 789);
+        final String expectedVersion = "version3";
         CompletableFuture<CoordinationResponse> responseFuture =
-                coordinator.handleCoordinationRequest(request);
+                coordinator.handleCoordinationRequest(
+                        
createRequestForClientGeneratedResponse(expectedVersion));
         coordinator.executionAttemptFailed(0, 0, null);
 
         // new server comes
@@ -106,27 +106,59 @@ public class CollectSinkOperatorCoordinatorTest {
                 0, 0, new CollectSinkAddressEvent(server.getServerAddress()));
 
         // check failed request
-        response = (CollectCoordinationResponse) responseFuture.get();
-        assertResponseEquals(request, response, -1, Collections.emptyList());
+        assertEmptyResponseGeneratedFromClient(responseFuture, 
expectedVersion);
 
         // a normal response
-        request = new CollectCoordinationRequest("version4", 101112);
-        response =
-                (CollectCoordinationResponse) 
coordinator.handleCoordinationRequest(request).get();
-        assertResponseEquals(request, response, 0, expected.get(0));
+        response = 
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
+        assertResponseWithDefaultMetadataFromServer(response, expected.get(0));
 
         server.close();
         coordinator.close();
     }
 
-    private void assertResponseEquals(
-            CollectCoordinationRequest request,
-            CollectCoordinationResponse response,
-            long expectedLastCheckpointedOffset,
+    private static CoordinationRequest 
createRequestForServerGeneratedResponse() {
+        final String unusedVersion = "random-version";
+        return createRequestForClientGeneratedResponse(unusedVersion);
+    }
+
+    private static CoordinationRequest 
createRequestForClientGeneratedResponse(String version) {
+        final int unusedOffset = 123;
+        return new CollectCoordinationRequest(version, unusedOffset);
+    }
+
+    private static void assertEmptyResponseGeneratedFromServer(
+            CompletableFuture<CoordinationResponse> responseFuture) throws 
Exception {
+        assertEmptyResponseGeneratedFromClient(
+                responseFuture, ServerThread.DEFAULT_SERVER_RESPONSE_VERSION);
+    }
+
+    private static void assertEmptyResponseGeneratedFromClient(
+            CompletableFuture<CoordinationResponse> responseFuture, String 
expectedVersion)
+            throws Exception {
+        assertResponse(responseFuture, expectedVersion, -1, 
Collections.emptyList());
+    }
+
+    private static void assertResponseWithDefaultMetadataFromServer(
+            CompletableFuture<CoordinationResponse> responseFuture, List<Row> 
expectedData)
+            throws Exception {
+        assertResponse(
+                responseFuture,
+                ServerThread.DEFAULT_SERVER_RESPONSE_VERSION,
+                ServerThread.DEFAULT_SERVER_RESPONSE_OFFSET,
+                expectedData);
+    }
+
+    private static void assertResponse(
+            CompletableFuture<CoordinationResponse> responseFuture,
+            String expectedVersion,
+            long expectedOffset,
             List<Row> expectedResults)
             throws Exception {
-        Assert.assertEquals(request.getVersion(), response.getVersion());
-        Assert.assertEquals(expectedLastCheckpointedOffset, 
response.getLastCheckpointedOffset());
+        final CollectCoordinationResponse response =
+                (CollectCoordinationResponse) responseFuture.get();
+
+        Assert.assertEquals(expectedVersion, response.getVersion());
+        Assert.assertEquals(expectedOffset, 
response.getLastCheckpointedOffset());
         List<Row> results = response.getResults(serializer);
         Assert.assertEquals(expectedResults.size(), results.size());
         for (int i = 0; i < results.size(); i++) {
@@ -141,6 +173,9 @@ public class CollectSinkOperatorCoordinatorTest {
 
     private static class ServerThread extends Thread {
 
+        static final String DEFAULT_SERVER_RESPONSE_VERSION = 
"server-response-version";
+        static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
+
         private final LinkedList<List<Row>> data;
         private final int closeRequestNum;
 
@@ -171,7 +206,8 @@ public class CollectSinkOperatorCoordinatorTest {
                         outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
                     }
 
-                    CollectCoordinationRequest request = new 
CollectCoordinationRequest(inStream);
+                    // parsing the request to ensure correct format of input 
message
+                    new CollectCoordinationRequest(inStream);
 
                     requestNum++;
                     if (requestNum >= closeRequestNum) {
@@ -180,12 +216,12 @@ public class CollectSinkOperatorCoordinatorTest {
                         break;
                     }
 
-                    CollectCoordinationResponse response =
-                            new CollectCoordinationResponse(
-                                    request.getVersion(),
-                                    0,
-                                    
CollectTestUtils.toBytesList(data.removeFirst(), serializer));
-                    response.serialize(outStream);
+                    // serialize generic response (only the data is relevant)
+                    new CollectCoordinationResponse(
+                                    DEFAULT_SERVER_RESPONSE_VERSION,
+                                    DEFAULT_SERVER_RESPONSE_OFFSET,
+                                    
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
+                            .serialize(outStream);
                 }
 
                 socket.close();

Reply via email to