[
https://issues.apache.org/jira/browse/BEAM-6263?focusedWorklogId=176932&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176932
]
ASF GitHub Bot logged work on BEAM-6263:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Dec/18 11:12
Start Date: 19/Dec/18 11:12
Worklog Time Spent: 10m
Work Description: mxm closed pull request #7309: [BEAM-6263] Fix
error-prone test setup for FlinkJobServerDriver
URL: https://github.com/apache/beam/pull/7309
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index de30eda2a8bc..be36a5dc94fc 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -49,8 +49,8 @@
@VisibleForTesting ServerConfiguration configuration;
private final ServerFactory jobServerFactory;
private final ServerFactory artifactServerFactory;
- private GrpcFnServer<InMemoryJobService> jobServer;
- private GrpcFnServer<BeamFileSystemArtifactStagingService>
artifactStagingServer;
+ private volatile GrpcFnServer<InMemoryJobService> jobServer;
+ private volatile GrpcFnServer<BeamFileSystemArtifactStagingService>
artifactStagingServer;
/** Configuration for the jobServer. */
public static class ServerConfiguration {
@@ -184,12 +184,16 @@ public void run() {
}
}
+ // This method is executed by TestPortableRunner via Reflection
public String start() throws IOException {
jobServer = createJobServer();
return jobServer.getApiServiceDescriptor().getUrl();
}
- public void stop() {
+ // This method is executed by TestPortableRunner via Reflection
+ // Needs to be synchronized to prevent concurrency issues in testing shutdown
+ @SuppressWarnings("WeakerAccess")
+ public synchronized void stop() {
if (jobServer != null) {
try {
jobServer.close();
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index fab70c7d9b97..23189c7d3dbc 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -24,9 +24,7 @@
import com.google.common.base.Charsets;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
-import java.net.ServerSocket;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,33 +82,30 @@ public void testConfigurationFromConfig() {
public void testJobServerDriver() throws Exception {
FlinkJobServerDriver driver = null;
Thread driverThread = null;
- final PrintStream oldOut = System.out;
+ final PrintStream oldOut = System.err;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream newOut = new PrintStream(baos);
try {
System.setErr(newOut);
- int freePort = getFreePort();
- int freePort2 = getFreePort();
- driver =
- FlinkJobServerDriver.fromParams(
- new String[] {
- "--job-port", String.valueOf(freePort),
- "--artifact-port", String.valueOf(freePort2)
- });
+ driver = FlinkJobServerDriver.fromParams(new String[] {"--job-port=0",
"--artifact-port=0"});
driverThread = new Thread(driver);
driverThread.start();
boolean success = false;
while (!success) {
newOut.flush();
String output = baos.toString(Charsets.UTF_8.name());
- if (output.contains("JobService started on localhost:" + freePort)
- && output.contains("ArtifactStagingService started on localhost:"
+ freePort2)) {
+ if (output.contains("JobService started on localhost:")
+ && output.contains("ArtifactStagingService started on
localhost:")) {
success = true;
} else {
Thread.sleep(100);
}
}
assertThat(driverThread.isAlive(), is(true));
+ } catch (Throwable t) {
+ // restore to print exception
+ System.setErr(oldOut);
+ throw t;
} finally {
System.setErr(oldOut);
if (driver != null) {
@@ -122,10 +117,4 @@ public void testJobServerDriver() throws Exception {
}
}
}
-
- private static int getFreePort() throws IOException {
- try (ServerSocket socket = new ServerSocket(0)) {
- return socket.getLocalPort();
- }
- }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 176932)
Time Spent: 40m (was: 0.5h)
> FlinkJobServerDriver test setup is error-prone
> ----------------------------------------------
>
> Key: BEAM-6263
> URL: https://issues.apache.org/jira/browse/BEAM-6263
> Project: Beam
> Issue Type: Test
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.10.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> There are some test issues which I came across.
> * Stderr is directed to stdout unintentionally
> * Exceptions can be swallowed which makes debugging hard
> * Ports allocations can collide during parallel tests
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)