[
https://issues.apache.org/jira/browse/BEAM-3119?focusedWorklogId=95561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95561
]
ASF GitHub Bot logged work on BEAM-3119:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/18 15:49
Start Date: 26/Apr/18 15:49
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #4965: BEAM-3119 ensure
the metrics thread pool is related to an execution
URL: https://github.com/apache/beam/pull/4965
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 4ef1c61ffbf..0656e147b4a 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -21,15 +21,13 @@
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
@@ -52,15 +50,6 @@
*/
class DirectMetrics extends MetricResults {
- // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in
the DirectRunner.
- private static final ExecutorService COUNTER_COMMITTER =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setThreadFactory(MoreExecutors.platformThreadFactory())
- .setDaemon(true)
- .setNameFormat("direct-metrics-counter-committer")
- .build());
-
private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
UpdateT combine(Iterable<UpdateT> updates);
@@ -76,6 +65,8 @@
private static class DirectMetric<UpdateT, ResultT> {
private final MetricAggregation<UpdateT, ResultT> aggregation;
+ private final Executor executor;
+
private final AtomicReference<UpdateT> finishedCommitted;
private final Object attemptedLock = new Object();
@@ -85,8 +76,10 @@
private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted
=
new ConcurrentHashMap<>();
- public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
+ public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation,
+ Executor executor) {
this.aggregation = aggregation;
+ this.executor = executor;
finishedCommitted = new AtomicReference<>(aggregation.zero());
finishedAttempted = aggregation.zero();
}
@@ -115,7 +108,7 @@ public void commitPhysical(final CommittedBundle<?> bundle,
final UpdateT finalC
// 2. We submit a runnable that will commit the update and remove the
tentative value in
// a synchronized block.
inflightAttempted.put(bundle, finalCumulative);
- COUNTER_COMMITTER.submit(
+ executor.execute(
() -> {
synchronized (attemptedLock) {
finishedAttempted =
aggregation.combine(asList(finishedAttempted, finalCumulative));
@@ -223,13 +216,12 @@ public GaugeResult extract(GaugeData data) {
};
/** The current values of counters in memory. */
- private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER));
+ private final MetricsMap<MetricKey, DirectMetric<Long, Long>> counters;
+
+ private final MetricsMap<MetricKey, DirectMetric<DistributionData,
DistributionResult>>
+ distributions;
- private MetricsMap<MetricKey, DirectMetric<DistributionData,
DistributionResult>> distributions =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION));
- private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges =
- new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE));
+ private final MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>>
gauges;
@AutoValue
abstract static class DirectMetricQueryResults implements MetricQueryResults
{
@@ -256,6 +248,13 @@ public static MetricQueryResults create(
}
}
+ DirectMetrics(ExecutorService executorService) {
+ this.counters = new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER,
executorService));
+ this.distributions =
+ new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION,
executorService));
+ this.gauges = new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE,
executorService));
+ }
+
@Override
public MetricQueryResults queryMetrics(MetricsFilter filter) {
ImmutableList.Builder<MetricResult<Long>> counterResults =
ImmutableList.builder();
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 32ba69dc059..b266c719cce 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -29,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -172,48 +176,53 @@ public DirectPipelineResult run(Pipeline
originalPipeline) {
}
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
- DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
- pipeline.traverseTopologically(graphVisitor);
+ try {
+ DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+ pipeline.traverseTopologically(graphVisitor);
- @SuppressWarnings("rawtypes")
- KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create();
- pipeline.traverseTopologically(keyedPValueVisitor);
+ @SuppressWarnings("rawtypes")
+ KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create();
+ pipeline.traverseTopologically(keyedPValueVisitor);
- DisplayDataValidator.validatePipeline(pipeline);
- DisplayDataValidator.validateOptions(getPipelineOptions());
+ DisplayDataValidator.validatePipeline(pipeline);
+ DisplayDataValidator.validateOptions(getPipelineOptions());
- DirectGraph graph = graphVisitor.getGraph();
- EvaluationContext context =
- EvaluationContext.create(
- getPipelineOptions(),
- clockSupplier.get(),
- Enforcement.bundleFactoryFor(enabledEnforcements, graph),
- graph,
- keyedPValueVisitor.getKeyedPValues());
+ DirectGraph graph = graphVisitor.getGraph();
+ ExecutorService metricsPool = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(false) // otherwise you say you want to leak, please
don't!
+ .setNameFormat("direct-metrics-counter-committer")
+ .build());
+ EvaluationContext context = EvaluationContext.create(
+ getPipelineOptions(), clockSupplier.get(),
+ Enforcement.bundleFactoryFor(enabledEnforcements, graph),
+ graph, keyedPValueVisitor.getKeyedPValues(),
+ metricsPool);
- TransformEvaluatorRegistry registry =
TransformEvaluatorRegistry.defaultRegistry(context);
- PipelineExecutor executor =
- ExecutorServiceParallelExecutor.create(
- options.getTargetParallelism(),
- registry,
- Enforcement.defaultModelEnforcements(enabledEnforcements),
- context);
- executor.start(graph, RootProviderRegistry.defaultRegistry(context));
+ TransformEvaluatorRegistry registry =
TransformEvaluatorRegistry.defaultRegistry(context);
+ PipelineExecutor executor = ExecutorServiceParallelExecutor.create(
+ options.getTargetParallelism(), registry,
+ Enforcement.defaultModelEnforcements(enabledEnforcements), context,
metricsPool);
+ executor.start(graph, RootProviderRegistry.defaultRegistry(context));
- DirectPipelineResult result = new DirectPipelineResult(executor, context);
- if (options.isBlockOnRun()) {
- try {
- result.waitUntilFinish();
- } catch (UserCodeException userException) {
- throw new PipelineExecutionException(userException.getCause());
- } catch (Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
+ DirectPipelineResult result = new DirectPipelineResult(executor,
context);
+ if (options.isBlockOnRun()) {
+ try {
+ result.waitUntilFinish();
+ } catch (UserCodeException userException) {
+ throw new PipelineExecutionException(userException.getCause());
+ } catch (Throwable t) {
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException(t);
}
- throw new RuntimeException(t);
}
+ return result;
+ } finally {
+ MetricsEnvironment.setMetricsSupported(false);
}
- return result;
}
/**
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 918083fadb5..0a7dc9ee681 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
@@ -94,13 +95,17 @@
private final Set<PValue> keyedPValues;
+ private final ExecutorService executorService;
+
public static EvaluationContext create(
- DirectOptions options,
- Clock clock,
- BundleFactory bundleFactory,
- DirectGraph graph,
- Set<PValue> keyedPValues) {
- return new EvaluationContext(options, clock, bundleFactory, graph,
keyedPValues);
+ DirectOptions options,
+ Clock clock,
+ BundleFactory bundleFactory,
+ DirectGraph graph,
+ Set<PValue> keyedPValues,
+ ExecutorService executorService) {
+ return new EvaluationContext(
+ options, clock, bundleFactory, graph, keyedPValues, executorService);
}
private EvaluationContext(
@@ -108,18 +113,20 @@ private EvaluationContext(
Clock clock,
BundleFactory bundleFactory,
DirectGraph graph,
- Set<PValue> keyedPValues) {
+ Set<PValue> keyedPValues,
+ ExecutorService executorService) {
this.options = checkNotNull(options);
this.clock = clock;
this.bundleFactory = checkNotNull(bundleFactory);
this.graph = checkNotNull(graph);
this.keyedPValues = keyedPValues;
+ this.executorService = executorService;
this.watermarkManager = WatermarkManager.create(clock, graph);
this.sideInputContainer = SideInputContainer.create(this,
graph.getViews());
this.applicationStateInternals = new ConcurrentHashMap<>();
- this.metrics = new DirectMetrics();
+ this.metrics = new DirectMetrics(executorService);
this.callbackExecutor =
WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index ab759287637..3131798a097 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -71,23 +71,28 @@
private final QueueMessageReceiver visibleUpdates;
+ private final ExecutorService metricsExecutor;
+
private AtomicReference<State> pipelineState = new
AtomicReference<>(State.RUNNING);
public static ExecutorServiceParallelExecutor create(
int targetParallelism,
TransformEvaluatorRegistry registry,
Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
- EvaluationContext context) {
+ EvaluationContext context,
+ ExecutorService metricsExecutor) {
return new ExecutorServiceParallelExecutor(
- targetParallelism, registry, transformEnforcements, context);
+ targetParallelism, registry, transformEnforcements, context,
metricsExecutor);
}
private ExecutorServiceParallelExecutor(
int targetParallelism,
TransformEvaluatorRegistry registry,
Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
- EvaluationContext context) {
+ EvaluationContext context,
+ ExecutorService metricsExecutor) {
this.targetParallelism = targetParallelism;
+ this.metricsExecutor = metricsExecutor;
// Don't use Daemon threads for workers. The Pipeline should continue to
execute even if there
// are no other active threads (for example, because waitUntilFinish was
not called)
this.executorService =
@@ -298,6 +303,11 @@ private void shutdownIfNecessary(State newState) {
} catch (final RuntimeException re) {
errors.add(re);
}
+ try {
+ metricsExecutor.shutdown();
+ } catch (final RuntimeException re) {
+ errors.add(re);
+ }
try {
registry.cleanup();
} catch (final Exception e) {
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 751fc942bed..acf5e1ce4fd 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -26,6 +26,8 @@
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
@@ -37,6 +39,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.joda.time.Instant;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -60,11 +63,19 @@
private static final MetricName NAME3 = MetricName.named("ns2", "name1");
private static final MetricName NAME4 = MetricName.named("ns2", "name2");
- private DirectMetrics metrics = new DirectMetrics();
+ private DirectMetrics metrics;
+ private ExecutorService executor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ executor = Executors.newSingleThreadExecutor();
+ metrics = new DirectMetrics(executor);
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
}
@SuppressWarnings("unchecked")
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index f43071f3300..97cef83cedc 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateNamespaces;
@@ -119,7 +120,8 @@ public void setup() {
NanosOffsetClock.create(),
bundleFactory,
graph,
- keyedPValueTrackingVisitor.getKeyedPValues());
+ keyedPValueTrackingVisitor.getKeyedPValues(),
+ Executors.newSingleThreadExecutor());
createdProducer = graph.getProducer(created);
downstreamProducer = graph.getProducer(downstream);
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
new file mode 100644
index 00000000000..216efe53891
--- /dev/null
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ThreadLeakTracker;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * Validates some basic behavior for the ExecutorServiceParallelExecutor.
+ */
+public class ExecutorServiceParallelExecutorTest {
+
+ @Rule
+ public final TestRule threadTracker = new ThreadLeakTracker();
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ @Test
+ public void ensureMetricsThreadDoesntLeak() {
+ final DirectGraph graph = DirectGraph.create(
+ emptyMap(), emptyMap(), LinkedListMultimap.create(),
+ LinkedListMultimap.create(), emptySet(), emptyMap());
+ final ExecutorService executorService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat("dontleak_" + getClass().getName() + "#" +
testName.getMethodName())
+ .build());
+
+ // fake a metrics usage
+ executorService.submit(() -> {});
+
+ final EvaluationContext context = EvaluationContext.create(
+ PipelineOptionsFactory.create().as(DirectOptions.class),
+ MockClock.fromInstant(Instant.now()),
+ CloningBundleFactory.create(), graph, emptySet(), executorService);
+ ExecutorServiceParallelExecutor
+ .create(
+ 2,
+ TransformEvaluatorRegistry.defaultRegistry(context),
+ emptyMap(),
+ context,
+ executorService)
+ .stop();
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
new file mode 100644
index 00000000000..8a680c80af9
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ThreadLeakTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.stream.Stream;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Tracks the threads created during a test method execution (or class using
@ClassRule)
+ * and fails if some still exists after the test method execution.
+ */
+public class ThreadLeakTracker implements TestRule {
+ private final Field groupField;
+
+ public ThreadLeakTracker() {
+ try {
+ groupField = Thread.class.getDeclaredField("group");
+ if (!groupField.isAccessible()) {
+ groupField.setAccessible(true);
+ }
+ } catch (final NoSuchFieldException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Statement apply(final Statement base, final Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ final Thread thread = Thread.currentThread();
+ final ThreadGroup threadGroup = thread.getThreadGroup();
+ final ThreadGroup testGroup = new ThreadGroup(threadGroup,
Long.toString(thread.getId()));
+ groupField.set(thread, testGroup);
+ try {
+ base.evaluate();
+ } finally {
+ groupField.set(thread, threadGroup);
+ final Thread[] threads = listThreads();
+ final List<Thread> leaked = Stream.of(threads)
+ .filter(t -> t.getThreadGroup() == testGroup)
+ .collect(toList());
+ if (!leaked.isEmpty()) {
+ fail("Some threads leaked: " + leaked.stream().map(Thread::getName)
+ .collect(joining("\n- ", "\n- ", "")));
+ }
+ }
+ }
+ };
+ }
+
+ private Thread[] listThreads() {
+ final int count = Thread.activeCount();
+ final Thread[] threads = new Thread[Math.max(count * 2, 16)];
+ final int threadCount = Thread.enumerate(threads);
+ final Thread[] array = new Thread[threadCount];
+ System.arraycopy(threads, 0, array, 0, threadCount);
+ return array;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 95561)
Time Spent: 4.5h (was: 4h 20m)
> direct-metrics-counter-committer threads are leaking
> ----------------------------------------------------
>
> Key: BEAM-3119
> URL: https://issues.apache.org/jira/browse/BEAM-3119
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Etienne Chauchot
> Assignee: Romain Manni-Bucau
> Priority: Major
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> When I run ElasticsearchIOTests using ESv5, there is a thread leak control
> mechanism ({{com.carrotsearch.randomizedtesting.ThreadLeakControl}}). It
> waits for 5s for non-terminated threads at the end of a test. It detects
> leaked {{direct-metrics-counter-committer}} thread.
> {code}
> com.carrotsearch.randomizedtesting.ThreadLeakError: There are still zombie
> threads that couldn't be terminated:
> 1) Thread[id=296, name=direct-metrics-counter-committer,
> state=TIMED_WAITING, group=TGRP-ElasticsearchIOTest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at
> java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> at __randomizedtesting.SeedInfo.seed([59E504CA1B0DD6A8]:0){code}
> I tried to increase the timeout to 30s (by patching
> randomizedtesting-runner-2.5.0.jar) but still gets a zombie thread.
> To reproduce, just comment
> {code}
> @ThreadLeakScope(ThreadLeakScope.Scope.NONE)
> {code}
> in
> {code}
> beam/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
> {code}
> and run
> {code}
> testRead()
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)