[
https://issues.apache.org/jira/browse/BEAM-4088?focusedWorklogId=142484&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142484
]
ASF GitHub Bot logged work on BEAM-4088:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Sep/18 23:46
Start Date: 08/Sep/18 23:46
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #5280: [BEAM-4088]
Instrument ExecutorServiceParallelExecutorTest
URL: https://github.com/apache/beam/pull/5280
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
index 9a78a044fc0..d4b5ba12259 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
@@ -23,8 +23,13 @@
import com.google.common.collect.LinkedListMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.MetricsPusherTest;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -46,22 +51,24 @@
public class ExecutorServiceParallelExecutorTest {
private static final long NUM_ELEMENTS = 1000L;
-
+ @Rule public final TestName testName = new TestName();
private final TestPipeline pipeline = TestPipeline.create();
private final TestRule threadLeakTracker = new ThreadLeakTracker();
-
@Rule public final TestRule execution =
outerRule(pipeline).around(threadLeakTracker);
- @Rule public final TestName testName = new TestName();
-
@Test
@Ignore("https://issues.apache.org/jira/browse/BEAM-4088 Test reliably
fails.")
public void ensureMetricsThreadDoesntLeak() {
final DirectGraph graph =
DirectGraph.create(
emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(),
emptyMap());
- final ExecutorService metricsExecutorService =
- Executors.newSingleThreadExecutor(
+ final InstrumentedThreadPoolExecutor metricsExecutorService =
+ new InstrumentedThreadPoolExecutor(
+ 1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue(),
new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("dontleak_" + getClass().getName() + "#" +
testName.getMethodName())
@@ -91,6 +98,19 @@ public void ensureMetricsThreadDoesntLeak() {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
+ if (!metricsExecutorService.isTerminated()) {
+ if (metricsExecutorService.isTerminating()) {
+ throw new RuntimeException(
+ String.format(
+ "metricsExecutorService is terminating but still has %s
threads. History is: %s",
+ metricsExecutorService.getPoolSize(),
metricsExecutorService.showMessages()));
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "metricsExecutorService should be terminating. History is: %s",
+ metricsExecutorService.showMessages()));
+ }
+ }
}
@Test
@@ -100,7 +120,45 @@ public void testNoThreadsLeakInPipelineExecution() {
pipeline.run();
}
+ private static class InstrumentedThreadPoolExecutor extends
ThreadPoolExecutor {
+
+ private List<String> messages = new ArrayList<>();
+
+ private InstrumentedThreadPoolExecutor(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ }
+
+ @Override
+ public void shutdown() {
+ messages.add(String.format("shutdown %s " + Instant.now()));
+ super.shutdown();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ super.execute(command);
+ messages.add(String.format("after execute %s " + Instant.now()));
+ }
+
+ @Override
+ protected void terminated() {
+ messages.add(String.format("terminated %s " + Instant.now()));
+ super.terminated();
+ }
+
+ public String showMessages() {
+ return Arrays.toString(messages.toArray());
+ }
+ }
+
private static class CountingDoFn extends DoFn<Long, Long> {
+
private final Counter counter = Metrics.counter(MetricsPusherTest.class,
"counter");
@ProcessElement
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 142484)
Time Spent: 3.5h (was: 3h 20m)
> ExecutorServiceParallelExecutorTest#ensureMetricsThreadDoesntLeak in PR #4965
> does not pass in gradle
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-4088
> URL: https://issues.apache.org/jira/browse/BEAM-4088
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> This is a new test being added to ensure threads don't leak. The failure
> seems to indicate that threads do leak.
> This test fails using gradle but previously passed using maven
> PR: https://github.com/apache/beam/pull/4965
> Presubmit Failure:
> * https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/4059/
> *
> https://scans.gradle.com/s/grha56432j3t2/tests/jqhvlvf72f7pg-ipde5etqqejoa?openStackTraces=WzBd
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)