[
https://issues.apache.org/jira/browse/BEAM-5110?focusedWorklogId=136795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136795
]
ASF GitHub Bot logged work on BEAM-5110:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Aug/18 00:44
Start Date: 22/Aug/18 00:44
Worklog Time Spent: 10m
Work Description: tweise closed pull request #6189: [BEAM-5110]
Explicitly count the references for BatchFlinkExecutableStageContext …
URL: https://github.com/apache/beam/pull/6189
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
index 9f7c1710aa1..5ba04c60739 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.runners.flink.translation.functions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import java.io.IOException;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
@@ -35,7 +32,7 @@
import org.slf4j.LoggerFactory;
/** Implementation of a {@link FlinkExecutableStageContext} for batch jobs. */
-class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext {
+class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkBatchExecutableStageContext.class);
private final JobBundleFactory jobBundleFactory;
@@ -68,32 +65,20 @@ public StateRequestHandler getStateRequestHandler(
}
@Override
- protected void finalize() throws Exception {
+ public void close() throws Exception {
jobBundleFactory.close();
}
enum BatchFactory implements Factory {
- INSTANCE;
+ REFERENCE_COUNTING;
- @SuppressWarnings("Immutable") // observably immutable
- private final LoadingCache<JobInfo, FlinkBatchExecutableStageContext>
cachedContexts;
-
- BatchFactory() {
- cachedContexts =
- CacheBuilder.newBuilder()
- .weakValues()
- .build(
- new CacheLoader<JobInfo, FlinkBatchExecutableStageContext>()
{
- @Override
- public FlinkBatchExecutableStageContext load(JobInfo
jobInfo) throws Exception {
- return create(jobInfo);
- }
- });
- }
+ private static final ReferenceCountingFlinkExecutableStageContextFactory
actualFactory =
+ ReferenceCountingFlinkExecutableStageContextFactory.create(
+ FlinkBatchExecutableStageContext::create);
@Override
public FlinkExecutableStageContext get(JobInfo jobInfo) {
- return cachedContexts.getUnchecked(jobInfo);
+ return actualFactory.get(jobInfo);
}
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index a1e516c2884..ba59b534742 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -25,18 +25,20 @@
import org.apache.flink.api.common.functions.RuntimeContext;
/** The Flink context required in order to execute {@link ExecutableStage
stages}. */
-public interface FlinkExecutableStageContext {
+public interface FlinkExecutableStageContext extends AutoCloseable {
/**
* Creates {@link FlinkExecutableStageContext} instances. Serializable so
that factories can be
* defined at translation time and distributed to TaskManagers.
*/
interface Factory extends Serializable {
+
+ /** Get or create {@link FlinkExecutableStageContext} for given {@link
JobInfo}. */
FlinkExecutableStageContext get(JobInfo jobInfo);
}
static Factory batchFactory() {
- return FlinkBatchExecutableStageContext.BatchFactory.INSTANCE;
+ return FlinkBatchExecutableStageContext.BatchFactory.REFERENCE_COUNTING;
}
StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 3da58e9c516..c501274d0b0 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -65,8 +65,8 @@
// Worker-local fields. These should only be constructed and consumed on
Flink TaskManagers.
private transient RuntimeContext runtimeContext;
- private transient FlinkExecutableStageContext stageContext;
private transient StateRequestHandler stateRequestHandler;
+ private transient FlinkExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private transient BundleProgressHandler progressHandler;
@@ -127,7 +127,7 @@ public void mapPartition(
@Override
public void close() throws Exception {
try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
- // Remove the reference to stageContext and make stageContext available
for garbage collection.
+ try (AutoCloseable closable = stageContext) {}
stageContext = null;
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
new file mode 100644
index 00000000000..5121038010c
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FlinkExecutableStageContext.Factory} which counts
FlinkExecutableStageContext reference
+ * for book keeping.
+ */
+public class ReferenceCountingFlinkExecutableStageContextFactory
+ implements FlinkExecutableStageContext.Factory {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
+ private static final int TTL_IN_SECONDS = 30;
+ private static final int MAX_RETRY = 3;
+
+ private final Creator creator;
+ private transient volatile ScheduledExecutorService executor;
+ private transient volatile ConcurrentHashMap<String, WrappedContext>
keyRegistry;
+
+ public static ReferenceCountingFlinkExecutableStageContextFactory
create(Creator creator) {
+ return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
+ }
+
+ private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator)
{
+ this.creator = creator;
+ }
+
+ @Override
+ public FlinkExecutableStageContext get(JobInfo jobInfo) {
+ // Retry is needed in case where an existing wrapper is picked from the
cache but by
+ // the time we accessed wrapper.referenceCount, the wrapper was tombstoned
by a pending
+ // release task.
+ // This race condition is highly unlikely to happen as there is no
systematic coding
+ // practice which can cause this error because of TTL. However, even in
very unlikely case
+ // when it happen we have the retry which get a valid context.
+ // Note: There is no leak in this logic as the cleanup is only done in
release.
+ // In case of usage error where release is called before corresponding get
finishes,
+ // release might throw an error. If release did not throw an error than we
can be sure that
+ // the state of the system remains valid and appropriate cleanup will be
done at TTL.
+ for (int retry = 0; retry < MAX_RETRY; retry++) {
+ // ConcurrentHashMap will handle the thread safety at the creation time.
+ WrappedContext wrapper =
+ getCache()
+ .computeIfAbsent(
+ jobInfo.jobId(),
+ jobId -> {
+ try {
+ return new WrappedContext(jobInfo,
creator.apply(jobInfo));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to create context for job " +
jobInfo.jobId(), e);
+ }
+ });
+ // Take a lock on wrapper before modifying reference count.
+ // Use null referenceCount == null as a tombstone for the wrapper.
+ synchronized (wrapper) {
+ if (wrapper.referenceCount != null) {
+ // The wrapper is still valid.
+ // Release has not yet got the lock and has not yet removed the
wrapper.
+ wrapper.referenceCount.incrementAndGet();
+ return wrapper;
+ }
+ }
+ }
+
+ throw new RuntimeException(
+ String.format(
+ "Max retry %s exhausted while creating Context for job %s",
+ MAX_RETRY, jobInfo.jobId()));
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void scheduleRelease(JobInfo jobInfo) {
+ WrappedContext wrapper = getCache().get(jobInfo.jobId());
+ Preconditions.checkState(
+ wrapper != null, "Releasing context for unknown job: " +
jobInfo.jobId());
+ // Schedule task to clean the container later.
+ getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS,
TimeUnit.SECONDS);
+ }
+
+ private ConcurrentHashMap<String, WrappedContext> getCache() {
+ // Lazily initialize keyRegistry because serialization will set it to null.
+ if (keyRegistry != null) {
+ return keyRegistry;
+ }
+ synchronized (this) {
+ if (keyRegistry == null) {
+ keyRegistry = new ConcurrentHashMap<>();
+ }
+ return keyRegistry;
+ }
+ }
+
+ private ScheduledExecutorService getExecutor() {
+ // Lazily initialize executor because serialization will set it to null.
+ if (executor != null) {
+ return executor;
+ }
+ synchronized (this) {
+ if (executor == null) {
+ executor =
+ Executors.newScheduledThreadPool(1, new
ThreadFactoryBuilder().setDaemon(true).build());
+ }
+ return executor;
+ }
+ }
+
+ @VisibleForTesting
+ void release(FlinkExecutableStageContext context) {
+ @SuppressWarnings({"unchecked", "Not exected to be called from outside."})
+ WrappedContext wrapper = (WrappedContext) context;
+ synchronized (wrapper) {
+ if (wrapper.referenceCount.decrementAndGet() == 0) {
+ // Tombstone wrapper.
+ wrapper.referenceCount = null;
+ if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
+ try {
+ wrapper.closeActual();
+ } catch (Exception e) {
+ LOG.error("Unable to close.", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * {@link WrappedContext} does not expose equals of actual {@link
FlinkExecutableStageContext}.
+ */
+ private class WrappedContext implements FlinkExecutableStageContext {
+ private JobInfo jobInfo;
+ private AtomicInteger referenceCount;
+ private FlinkExecutableStageContext context;
+
+ /** {@link WrappedContext#equals(Object)} is only based on {@link
JobInfo#jobId()}. */
+ WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext context) {
+ this.jobInfo = jobInfo;
+ this.context = context;
+ this.referenceCount = new AtomicInteger(0);
+ }
+
+ @Override
+ public StageBundleFactory getStageBundleFactory(ExecutableStage
executableStage) {
+ return context.getStageBundleFactory(executableStage);
+ }
+
+ @Override
+ public StateRequestHandler getStateRequestHandler(
+ ExecutableStage executableStage, RuntimeContext runtimeContext) {
+ return context.getStateRequestHandler(executableStage, runtimeContext);
+ }
+
+ @Override
+ public void close() {
+ // Just schedule the context as we want to reuse it if possible.
+ scheduleRelease(jobInfo);
+ }
+
+ private void closeActual() throws Exception {
+ context.close();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WrappedContext that = (WrappedContext) o;
+ return Objects.equals(jobInfo.jobId(), that.jobInfo.jobId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobInfo);
+ }
+
+ @Override
+ public String toString() {
+ return "ContextWrapper{"
+ + "jobId='"
+ + jobInfo
+ + '\''
+ + ", referenceCount="
+ + referenceCount
+ + '}';
+ }
+ }
+
+ /** Interface for creator which extends Serializable. */
+ public interface Creator
+ extends ThrowingFunction<JobInfo, FlinkExecutableStageContext>,
Serializable {}
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2c042c78db7..d63a3cb117e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -150,6 +150,7 @@ private void
processElementWithSdkHarness(WindowedValue<InputT> element) throws
public void close() throws Exception {
try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
// Remove the reference to stageContext and make stageContext available
for garbage collection.
+ try (AutoCloseable closable = stageContext) {}
stageContext = null;
super.close();
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
new file mode 100644
index 00000000000..cf758647307
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ReferenceCountingFlinkExecutableStageContextFactory}. */
+@RunWith(JUnit4.class)
+public class ReferenceCountingFlinkExecutableStageContextFactoryTest {
+
+ @Test
+ public void testCreateReuseReleaseCreate() throws Exception {
+
+ Creator creator = mock(Creator.class);
+ FlinkExecutableStageContext c1 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c2 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c3 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c4 = mock(FlinkExecutableStageContext.class);
+ when(creator.apply(any(JobInfo.class)))
+ .thenReturn(c1)
+ .thenReturn(c2)
+ .thenReturn(c3)
+ .thenReturn(c4);
+ ReferenceCountingFlinkExecutableStageContextFactory factory =
+ ReferenceCountingFlinkExecutableStageContextFactory.create(creator);
+ JobInfo jobA = mock(JobInfo.class);
+ when(jobA.jobId()).thenReturn("jobA");
+ JobInfo jobB = mock(JobInfo.class);
+ when(jobB.jobId()).thenReturn("jobB");
+ FlinkExecutableStageContext ac1A = factory.get(jobA); // 1 open jobA
+ FlinkExecutableStageContext ac2B = factory.get(jobB); // 1 open jobB
+ Assert.assertSame(
+ "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2
open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac2B, factory.get(jobB)); // 2
open jobB
+ factory.release(ac1A); // 1 open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2
open jobA
+ factory.release(ac1A); // 1 open jobA
+ factory.release(ac1A); // 0 open jobA
+ FlinkExecutableStageContext ac3A = factory.get(jobA); // 1 open jobA
+ Assert.assertNotSame("We should get a new instance.", ac1A, ac3A);
+ Assert.assertSame(
+ "Context should be cached and reused.", ac3A, factory.get(jobA)); // 2
open jobA
+ factory.release(ac3A); // 1 open jobA
+ factory.release(ac3A); // 0 open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac2B, factory.get(jobB)); // 3
open jobB
+ factory.release(ac2B); // 2 open jobB
+ factory.release(ac2B); // 1 open jobB
+ factory.release(ac2B); // 0 open jobB
+ FlinkExecutableStageContext ac4B = factory.get(jobB); // 1 open jobB
+ Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
+ factory.release(ac4B); // 0 open jobB
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 136795)
Time Spent: 3h (was: 2h 50m)
> Reconile Flink JVM singleton management with deployment
> -------------------------------------------------------
>
> Key: BEAM-5110
> URL: https://issues.apache.org/jira/browse/BEAM-5110
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
> [~angoenka] noticed through debugging that multiple instances of
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when
> executing in standalone cluster mode. This context factory is responsible for
> maintaining singleton state across a TaskManager (JVM) in order to share SDK
> Environments across workers in a given job. The multiple-loading breaks
> singleton semantics and results in an indeterminate number of Environments
> being created.
> It turns out that the [Flink classloading
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
> is determined by deployment mode. Note that "user code" as referenced by
> this link is actually the Flink job server jar. Actual end-user code lives
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using
> file locks and/or additional gRPC servers), we need to force non-dynamic
> classloading. For example, this happens when jobs are submitted to YARN for
> one-off deployments via `flink run`. However, connecting to an existing
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to
> enforce) deployment modes that are consistent with our requirements, or (if
> possible) create a custom classloader that enforces singleton loading.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)