[
https://issues.apache.org/jira/browse/BEAM-4088?focusedWorklogId=96707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96707
]
ASF GitHub Bot logged work on BEAM-4088:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Apr/18 14:13
Start Date: 30/Apr/18 14:13
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #5231: [BEAM-4088] fix
thread leak in tests on PR #4965
URL: https://github.com/apache/beam/pull/5231
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 4ef1c61ffbf..0656e147b4a 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -21,15 +21,13 @@
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
@@ -52,15 +50,6 @@
*/
class DirectMetrics extends MetricResults {
- // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in
the DirectRunner.
- private static final ExecutorService COUNTER_COMMITTER =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setThreadFactory(MoreExecutors.platformThreadFactory())
- .setDaemon(true)
- .setNameFormat("direct-metrics-counter-committer")
- .build());
-
private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
UpdateT combine(Iterable<UpdateT> updates);
@@ -76,6 +65,8 @@
private static class DirectMetric<UpdateT, ResultT> {
private final MetricAggregation<UpdateT, ResultT> aggregation;
+ private final Executor executor;
+
private final AtomicReference<UpdateT> finishedCommitted;
private final Object attemptedLock = new Object();
@@ -85,8 +76,10 @@
private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted
=
new ConcurrentHashMap<>();
- public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
+ public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation,
+ Executor executor) {
this.aggregation = aggregation;
+ this.executor = executor;
finishedCommitted = new AtomicReference<>(aggregation.zero());
finishedAttempted = aggregation.zero();
}
@@ -115,7 +108,7 @@ public void commitPhysical(final CommittedBundle<?> bundle,
final UpdateT finalC
// 2. We submit a runnable that will commit the update and remove the
tentative value in
// a synchronized block.
inflightAttempted.put(bundle, finalCumulative);
- COUNTER_COMMITTER.submit(
+ executor.execute(
() -> {
synchronized (attemptedLock) {
finishedAttempted =
aggregation.combine(asList(finishedAttempted, finalCumulative));
@@ -223,13 +216,12 @@ public GaugeResult extract(GaugeData data) {
};
/** The current values of counters in memory. */
- private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER));
+ private final MetricsMap<MetricKey, DirectMetric<Long, Long>> counters;
+
+ private final MetricsMap<MetricKey, DirectMetric<DistributionData,
DistributionResult>>
+ distributions;
- private MetricsMap<MetricKey, DirectMetric<DistributionData,
DistributionResult>> distributions =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION));
- private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE));
+ private final MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>>
gauges;
@AutoValue
abstract static class DirectMetricQueryResults implements MetricQueryResults
{
@@ -256,6 +248,13 @@ public static MetricQueryResults create(
}
}
+ DirectMetrics(ExecutorService executorService) {
+ this.counters = new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER,
executorService));
+ this.distributions =
+ new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION,
executorService));
+ this.gauges = new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE,
executorService));
+ }
+
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter) {
ImmutableList.Builder<MetricResult<Long>> counterResults =
ImmutableList.builder();
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 10b88248bd5..6329c55d813 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -29,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -165,23 +169,30 @@ public DirectPipelineResult run(Pipeline
originalPipeline) {
}
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
- DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
- pipeline.traverseTopologically(graphVisitor);
+ try {
+ DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+ pipeline.traverseTopologically(graphVisitor);
- @SuppressWarnings("rawtypes")
- KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create();
- pipeline.traverseTopologically(keyedPValueVisitor);
+ @SuppressWarnings("rawtypes")
+ KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create();
+ pipeline.traverseTopologically(keyedPValueVisitor);
DisplayDataValidator.validatePipeline(pipeline);
DisplayDataValidator.validateOptions(options);
- DirectGraph graph = graphVisitor.getGraph();
- EvaluationContext context =
- EvaluationContext.create(
- clockSupplier.get(),
- Enforcement.bundleFactoryFor(enabledEnforcements, graph),
- graph,
- keyedPValueVisitor.getKeyedPValues());
+ ExecutorService metricsPool = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(false) // otherwise you say you want to leak, please
don't!
+ .setNameFormat("direct-metrics-counter-committer")
+ .build());
+ DirectGraph graph = graphVisitor.getGraph();
+ EvaluationContext context =
+ EvaluationContext.create(
+ clockSupplier.get(),
+ Enforcement.bundleFactoryFor(enabledEnforcements, graph),
+ graph,
+ keyedPValueVisitor.getKeyedPValues(), metricsPool);
TransformEvaluatorRegistry registry = TransformEvaluatorRegistry
.javaSdkNativeRegistry(context, options);
@@ -190,23 +201,26 @@ public DirectPipelineResult run(Pipeline
originalPipeline) {
options.getTargetParallelism(),
registry,
Enforcement.defaultModelEnforcements(enabledEnforcements),
- context);
+ context, metricsPool);
executor.start(graph, RootProviderRegistry.javaNativeRegistry(context,
options));
- DirectPipelineResult result = new DirectPipelineResult(executor, context);
- if (options.isBlockOnRun()) {
- try {
- result.waitUntilFinish();
- } catch (UserCodeException userException) {
- throw new PipelineExecutionException(userException.getCause());
- } catch (Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
+ DirectPipelineResult result = new DirectPipelineResult(executor,
context);
+ if (options.isBlockOnRun()) {
+ try {
+ result.waitUntilFinish();
+ } catch (UserCodeException userException) {
+ throw new PipelineExecutionException(userException.getCause());
+ } catch (Throwable t) {
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException(t);
}
- throw new RuntimeException(t);
}
+ return result;
+ } finally {
+ MetricsEnvironment.setMetricsSupported(false);
}
- return result;
}
/**
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 027e661f5bb..75fdbae351b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -93,29 +94,34 @@
private final Set<PValue> keyedPValues;
+ private final ExecutorService executorService;
+
public static EvaluationContext create(
Clock clock,
BundleFactory bundleFactory,
DirectGraph graph,
- Set<PValue> keyedPValues) {
- return new EvaluationContext(clock, bundleFactory, graph, keyedPValues);
+ Set<PValue> keyedPValues,
+ ExecutorService executorService) {
+ return new EvaluationContext(clock, bundleFactory, graph, keyedPValues,
executorService);
}
private EvaluationContext(
Clock clock,
BundleFactory bundleFactory,
DirectGraph graph,
- Set<PValue> keyedPValues) {
+ Set<PValue> keyedPValues,
+ ExecutorService executorService) {
this.clock = clock;
this.bundleFactory = checkNotNull(bundleFactory);
this.graph = checkNotNull(graph);
this.keyedPValues = keyedPValues;
+ this.executorService = executorService;
this.watermarkManager = WatermarkManager.create(clock, graph);
this.sideInputContainer = SideInputContainer.create(this,
graph.getViews());
this.applicationStateInternals = new ConcurrentHashMap<>();
- this.metrics = new DirectMetrics();
+ this.metrics = new DirectMetrics(executorService);
this.callbackExecutor =
WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index cce4d7e1947..2ccea0e66bd 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -73,23 +73,28 @@
private final QueueMessageReceiver visibleUpdates;
+ private final ExecutorService metricsExecutor;
+
private AtomicReference<State> pipelineState = new
AtomicReference<>(State.RUNNING);
public static ExecutorServiceParallelExecutor create(
int targetParallelism,
TransformEvaluatorRegistry registry,
Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
- EvaluationContext context) {
+ EvaluationContext context,
+ ExecutorService metricsExecutor) {
return new ExecutorServiceParallelExecutor(
- targetParallelism, registry, transformEnforcements, context);
+ targetParallelism, registry, transformEnforcements, context,
metricsExecutor);
}
private ExecutorServiceParallelExecutor(
int targetParallelism,
TransformEvaluatorRegistry registry,
Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
- EvaluationContext context) {
+ EvaluationContext context,
+ ExecutorService metricsExecutor) {
this.targetParallelism = targetParallelism;
+ this.metricsExecutor = metricsExecutor;
// Don't use Daemon threads for workers. The Pipeline should continue to
execute even if there
// are no other active threads (for example, because waitUntilFinish was
not called)
this.executorService =
@@ -300,6 +305,11 @@ private void shutdownIfNecessary(State newState) {
} catch (final RuntimeException re) {
errors.add(re);
}
+ try {
+ metricsExecutor.shutdown();
+ } catch (final RuntimeException re) {
+ errors.add(re);
+ }
try {
registry.cleanup();
} catch (final Exception e) {
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 751fc942bed..acf5e1ce4fd 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -26,6 +26,8 @@
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
@@ -37,6 +39,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.joda.time.Instant;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -60,11 +63,19 @@
private static final MetricName NAME3 = MetricName.named("ns2", "name1");
private static final MetricName NAME4 = MetricName.named("ns2", "name2");
- private DirectMetrics metrics = new DirectMetrics();
+ private DirectMetrics metrics;
+ private ExecutorService executor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ executor = Executors.newSingleThreadExecutor();
+ metrics = new DirectMetrics(executor);
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
}
@SuppressWarnings("unchecked")
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 800da7d57ab..81e0b98c69d 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateNamespaces;
@@ -118,7 +119,8 @@ public void setup() {
NanosOffsetClock.create(),
bundleFactory,
graph,
- keyedPValueTrackingVisitor.getKeyedPValues());
+ keyedPValueTrackingVisitor.getKeyedPValues(),
+ Executors.newSingleThreadExecutor());
createdProducer = graph.getProducer(created);
downstreamProducer = graph.getProducer(downstream);
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
new file mode 100644
index 00000000000..33cf619a753
--- /dev/null
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static org.junit.rules.RuleChain.outerRule;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.metrics.MetricsPusherTest;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ThreadLeakTracker;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * Validates some basic behavior for the ExecutorServiceParallelExecutor.
+ */
+public class ExecutorServiceParallelExecutorTest {
+
+ private static final long NUM_ELEMENTS = 1000L;
+
+ private final TestPipeline pipeline = TestPipeline.create();
+ private final TestRule threadLeakTracker = new ThreadLeakTracker();
+
+ @Rule
+ public final TestRule execution =
outerRule(pipeline).around(threadLeakTracker);
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ @Test
+ public void ensureMetricsThreadDoesntLeak() {
+ final DirectGraph graph = DirectGraph.create(
+ emptyMap(), emptyMap(), LinkedListMultimap.create(),
+ emptySet(), emptyMap());
+ final ExecutorService metricsExecutorService =
Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat("dontleak_" + getClass().getName() + "#" +
testName.getMethodName())
+ .build());
+
+ // fake a metrics usage
+ metricsExecutorService.submit(() -> {});
+
+ final EvaluationContext context = EvaluationContext.create(
+ MockClock.fromInstant(Instant.now()),
+ CloningBundleFactory.create(), graph, emptySet(),
metricsExecutorService);
+ ExecutorServiceParallelExecutor
+ .create(
+ 2, TransformEvaluatorRegistry.javaSdkNativeRegistry(context,
+ PipelineOptionsFactory.create().as(DirectOptions.class)),
emptyMap(),
+ context,
+ metricsExecutorService)
+ .stop();
+ try {
+ metricsExecutorService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ pipeline
+ .apply(
+ // Use maxReadTime to force unbounded mode.
+
GenerateSequence.from(0).to(NUM_ELEMENTS).withMaxReadTime(Duration.standardDays(1)))
+ .apply(ParDo.of(new CountingDoFn()));
+ pipeline.run();
+ }
+
+ private static class CountingDoFn extends DoFn<Long, Long> {
+ private final Counter counter = Metrics.counter(MetricsPusherTest.class,
"counter");
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ try {
+ counter.inc();
+ context.output(context.element());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
new file mode 100644
index 00000000000..8a680c80af9
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.stream.Stream;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Tracks the threads created during a test method execution (or class using
@ClassRule)
+ * and fails if some still exists after the test method execution.
+ */
+public class ThreadLeakTracker implements TestRule {
+ private final Field groupField;
+
+ public ThreadLeakTracker() {
+ try {
+ groupField = Thread.class.getDeclaredField("group");
+ if (!groupField.isAccessible()) {
+ groupField.setAccessible(true);
+ }
+ } catch (final NoSuchFieldException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Statement apply(final Statement base, final Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ final Thread thread = Thread.currentThread();
+ final ThreadGroup threadGroup = thread.getThreadGroup();
+ final ThreadGroup testGroup = new ThreadGroup(threadGroup,
Long.toString(thread.getId()));
+ groupField.set(thread, testGroup);
+ try {
+ base.evaluate();
+ } finally {
+ groupField.set(thread, threadGroup);
+ final Thread[] threads = listThreads();
+ final List<Thread> leaked = Stream.of(threads)
+ .filter(t -> t.getThreadGroup() == testGroup)
+ .collect(toList());
+ if (!leaked.isEmpty()) {
+ fail("Some threads leaked: " + leaked.stream().map(Thread::getName)
+ .collect(joining("\n- ", "\n- ", "")));
+ }
+ }
+ }
+ };
+ }
+
+ private Thread[] listThreads() {
+ final int count = Thread.activeCount();
+ final Thread[] threads = new Thread[Math.max(count * 2, 16)];
+ final int threadCount = Thread.enumerate(threads);
+ final Thread[] array = new Thread[threadCount];
+ System.arraycopy(threads, 0, array, 0, threadCount);
+ return array;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 96707)
Time Spent: 40m (was: 0.5h)
> ExecutorServiceParallelExecutorTest#ensureMetricsThreadDoesntLeak in PR #4965
> does not pass in gradle
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-4088
> URL: https://issues.apache.org/jira/browse/BEAM-4088
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> This is a new test being added to ensure threads don't leak. The failure
> seems to indicate that threads do leak.
> This test fails using gradle but previously passed using maven
> PR: https://github.com/apache/beam/pull/4965
> Presubmit Failure:
> * https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/4059/
> *
> https://scans.gradle.com/s/grha56432j3t2/tests/jqhvlvf72f7pg-ipde5etqqejoa?openStackTraces=WzBd
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)