This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f486ff8efe1233bb6c021e1eee441a317c9875f Author: Matthias Pohl <[email protected]> AuthorDate: Fri Aug 25 15:54:54 2023 +0200 [hotfix][test] Refactors test to remove version and offset server-side checks The version and offset check is not really necessary for the server-side processing because it's verifying test code (the ServerThread handles the forwarding of the version and the setting of the offset). Therefore, we can also remove those checks. --- .../CollectSinkOperatorCoordinatorTest.java | 102 ++++++++++++++------- 1 file changed, 69 insertions(+), 33 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java index d11e5712cdf..f6948dd9659 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils; import org.apache.flink.types.Row; @@ -57,10 +58,11 @@ public class CollectSinkOperatorCoordinatorTest { new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); coordinator.start(); - CollectCoordinationRequest request = new CollectCoordinationRequest("version", 123); - CollectCoordinationResponse response = - (CollectCoordinationResponse) coordinator.handleCoordinationRequest(request).get(); - assertResponseEquals(request, response, -1, Collections.emptyList()); + final String expectedVersion = "version"; + final CompletableFuture<CoordinationResponse> response = + coordinator.handleCoordinationRequest( + createRequestForClientGeneratedResponse(expectedVersion)); + assertEmptyResponseGeneratedFromClient(response, expectedVersion); coordinator.close(); } @@ -81,21 +83,19 @@ public class CollectSinkOperatorCoordinatorTest { 0, 0, new CollectSinkAddressEvent(server.getServerAddress())); // a normal response - CollectCoordinationRequest request = new CollectCoordinationRequest("version1", 123); - CollectCoordinationResponse response = - (CollectCoordinationResponse) coordinator.handleCoordinationRequest(request).get(); - assertResponseEquals(request, response, 0, expected.get(0)); + CompletableFuture<CoordinationResponse> response = + coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); + assertResponseWithDefaultMetadataFromServer(response, expected.get(0)); // a normal response - request = new CollectCoordinationRequest("version2", 456); - response = - (CollectCoordinationResponse) coordinator.handleCoordinationRequest(request).get(); - assertResponseEquals(request, response, 0, expected.get(1)); + response = coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); + assertResponseWithDefaultMetadataFromServer(response, expected.get(1)); // server closes here - request = new CollectCoordinationRequest("version3", 789); + final String expectedVersion = "version3"; CompletableFuture<CoordinationResponse> responseFuture = - coordinator.handleCoordinationRequest(request); + coordinator.handleCoordinationRequest( + createRequestForClientGeneratedResponse(expectedVersion)); coordinator.executionAttemptFailed(0, 0, null); // new server comes @@ -106,27 +106,59 @@ public class CollectSinkOperatorCoordinatorTest { 0, 0, new CollectSinkAddressEvent(server.getServerAddress())); // check failed request - response = (CollectCoordinationResponse) responseFuture.get(); - assertResponseEquals(request, response, -1, Collections.emptyList()); + assertEmptyResponseGeneratedFromClient(responseFuture, expectedVersion); // a normal response - request = new CollectCoordinationRequest("version4", 101112); - response = - (CollectCoordinationResponse) coordinator.handleCoordinationRequest(request).get(); - assertResponseEquals(request, response, 0, expected.get(0)); + response = coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); + assertResponseWithDefaultMetadataFromServer(response, expected.get(0)); server.close(); coordinator.close(); } - private void assertResponseEquals( - CollectCoordinationRequest request, - CollectCoordinationResponse response, - long expectedLastCheckpointedOffset, + private static CoordinationRequest createRequestForServerGeneratedResponse() { + final String unusedVersion = "random-version"; + return createRequestForClientGeneratedResponse(unusedVersion); + } + + private static CoordinationRequest createRequestForClientGeneratedResponse(String version) { + final int unusedOffset = 123; + return new CollectCoordinationRequest(version, unusedOffset); + } + + private static void assertEmptyResponseGeneratedFromServer( + CompletableFuture<CoordinationResponse> responseFuture) throws Exception { + assertEmptyResponseGeneratedFromClient( + responseFuture, ServerThread.DEFAULT_SERVER_RESPONSE_VERSION); + } + + private static void assertEmptyResponseGeneratedFromClient( + CompletableFuture<CoordinationResponse> responseFuture, String expectedVersion) + throws Exception { + assertResponse(responseFuture, expectedVersion, -1, Collections.emptyList()); + } + + private static void assertResponseWithDefaultMetadataFromServer( + CompletableFuture<CoordinationResponse> responseFuture, List<Row> expectedData) + throws Exception { + assertResponse( + responseFuture, + ServerThread.DEFAULT_SERVER_RESPONSE_VERSION, + ServerThread.DEFAULT_SERVER_RESPONSE_OFFSET, + expectedData); + } + + private static void assertResponse( + CompletableFuture<CoordinationResponse> responseFuture, + String expectedVersion, + long expectedOffset, List<Row> expectedResults) throws Exception { - Assert.assertEquals(request.getVersion(), response.getVersion()); - Assert.assertEquals(expectedLastCheckpointedOffset, response.getLastCheckpointedOffset()); + final CollectCoordinationResponse response = + (CollectCoordinationResponse) responseFuture.get(); + + Assert.assertEquals(expectedVersion, response.getVersion()); + Assert.assertEquals(expectedOffset, response.getLastCheckpointedOffset()); List<Row> results = response.getResults(serializer); Assert.assertEquals(expectedResults.size(), results.size()); for (int i = 0; i < results.size(); i++) { @@ -141,6 +173,9 @@ public class CollectSinkOperatorCoordinatorTest { private static class ServerThread extends Thread { + static final String DEFAULT_SERVER_RESPONSE_VERSION = "server-response-version"; + static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2; + private final LinkedList<List<Row>> data; private final int closeRequestNum; @@ -171,7 +206,8 @@ public class CollectSinkOperatorCoordinatorTest { outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); } - CollectCoordinationRequest request = new CollectCoordinationRequest(inStream); + // parsing the request to ensure correct format of input message + new CollectCoordinationRequest(inStream); requestNum++; if (requestNum >= closeRequestNum) { @@ -180,12 +216,12 @@ public class CollectSinkOperatorCoordinatorTest { break; } - CollectCoordinationResponse response = - new CollectCoordinationResponse( - request.getVersion(), - 0, - CollectTestUtils.toBytesList(data.removeFirst(), serializer)); - response.serialize(outStream); + // serialize generic response (only the data is relevant) + new CollectCoordinationResponse( + DEFAULT_SERVER_RESPONSE_VERSION, + DEFAULT_SERVER_RESPONSE_OFFSET, + CollectTestUtils.toBytesList(data.removeFirst(), serializer)) + .serialize(outStream); } socket.close();
