Repository: incubator-beam Updated Branches: refs/heads/master 351fc3efa -> be9b15803
Use an AtomicReference in InProcessSideInputContainer This fixes a TOCTOU race in the contents updating logic, where the determination that the current pane should replace the contents of the side input and the replacement is not a single atomic operation. Using AtomicReference allows the use of compareAndSet to ensure that the replacement can only occur on the pane that the decision to replace was made with. Fixes a race where a pane could be the latest, and replace a pane, but would be lost due to an earlier pane being written between the invalidation and loading of contents. Fixes a race where a reader can incorrectly read an empty iterable as the contents of a PCollectionView, due to occuring between the invalidate and reload steps. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77475992 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77475992 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77475992 Branch: refs/heads/master Commit: 7747599233aee96c8077c46db5ce093a856b6139 Parents: 4870525 Author: Thomas Groh <[email protected]> Authored: Tue May 10 11:27:37 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue May 10 15:02:29 2016 -0700 ---------------------------------------------------------------------- .../direct/InProcessSideInputContainer.java | 199 +++++++++---------- .../direct/InProcessSideInputContainerTest.java | 95 +++++---- 2 files changed, 147 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index 78889dc..1ef8f13 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -33,18 +33,16 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -54,36 +52,26 @@ import javax.annotation.Nullable; * available and writing to a {@link PCollectionView}. */ class InProcessSideInputContainer { - private final InProcessEvaluationContext evaluationContext; private final Collection<PCollectionView<?>> containedViews; - private final LoadingCache<PCollectionViewWindow<?>, - SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows; + private final LoadingCache< + PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> + viewByWindows; /** * Create a new {@link InProcessSideInputContainer} with the provided views and the provided * context. */ public static InProcessSideInputContainer create( - InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) { - CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - loader = new CacheLoader<PCollectionViewWindow<?>, - SettableFuture<Iterable<? extends WindowedValue<?>>>>() { - @Override - public SettableFuture<Iterable<? extends WindowedValue<?>>> load( - PCollectionViewWindow<?> view) { - return SettableFuture.create(); - } - }; - LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - viewByWindows = CacheBuilder.newBuilder().build(loader); - return new InProcessSideInputContainer(context, containedViews, viewByWindows); + final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) { + LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> + viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context)); + return new InProcessSideInputContainer(containedViews, viewByWindows); } - private InProcessSideInputContainer(InProcessEvaluationContext context, + private InProcessSideInputContainer( Collection<PCollectionView<?>> containedViews, - LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - viewByWindows) { - this.evaluationContext = context; + LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> + viewByWindows) { this.containedViews = ImmutableSet.copyOf(containedViews); this.viewByWindows = viewByWindows; } @@ -149,29 +137,75 @@ class InProcessSideInputContainer { private void updatePCollectionViewWindowValues( PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) { PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window); - SettableFuture<Iterable<? extends WindowedValue<?>>> future = null; - try { - future = viewByWindows.get(windowedView); - if (future.isDone()) { - Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator(); - PaneInfo newPane = windowValues.iterator().next().getPane(); - // The current value may have no elements, if no elements were produced for the window, - // but we are recieving late data. - if (!existingValues.hasNext() - || newPane.getIndex() > existingValues.next().getPane().getIndex()) { - viewByWindows.invalidate(windowedView); - viewByWindows.get(windowedView).set(windowValues); - } - } else { - future.set(windowValues); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (future != null && !future.isDone()) { - future.set(Collections.<WindowedValue<?>>emptyList()); - } - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); + AtomicReference<Iterable<? extends WindowedValue<?>>> contents = + viewByWindows.getUnchecked(windowedView); + if (contents.compareAndSet(null, windowValues)) { + // the value had never been set, so we set it and are done. + return; + } + PaneInfo newPane = windowValues.iterator().next().getPane(); + + Iterable<? extends WindowedValue<?>> existingValues; + long existingPane; + do { + existingValues = contents.get(); + existingPane = + Iterables.isEmpty(existingValues) + ? -1L + : existingValues.iterator().next().getPane().getIndex(); + } while (newPane.getIndex() > existingPane + && !contents.compareAndSet(existingValues, windowValues)); + } + + private static class CallbackSchedulingLoader extends + CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> { + private final InProcessEvaluationContext context; + + public CallbackSchedulingLoader( + InProcessEvaluationContext context) { + this.context = context; + } + + @Override + public AtomicReference<Iterable<? extends WindowedValue<?>>> + load(PCollectionViewWindow<?> view) { + + AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>(); + WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal(); + + context.scheduleAfterOutputWouldBeProduced(view.getView(), + view.getWindow(), + windowingStrategy, + new WriteEmptyViewContents(view.getView(), view.getWindow(), contents)); + return contents; + } + } + + private static class WriteEmptyViewContents implements Runnable { + private final PCollectionView<?> view; + private final BoundedWindow window; + private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents; + + private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window, + AtomicReference<Iterable<? extends WindowedValue<?>>> contents) { + this.contents = contents; + this.view = view; + this.window = window; + } + + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("view", view) + .add("window", window) + .toString(); } } @@ -190,43 +224,23 @@ class InProcessSideInputContainer { + "Contained views; %s", view, readerViews); - return getViewFuture(view, window).isDone(); + return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null; } @Override @Nullable public <T> T get(final PCollectionView<T> view, final BoundedWindow window) { - checkArgument( - readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); - try { - final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window); - // Safe covariant cast - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get(); - return view.fromIterableInternal(values); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - /** - * Gets the future containing the contents of the provided {@link PCollectionView} in the - * provided {@link BoundedWindow}, setting up a callback to populate the future with empty - * contents if necessary. - */ - private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture( - final PCollectionView<T> view, final BoundedWindow window) { - PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window); - final SettableFuture<Iterable<? extends WindowedValue<?>>> future = - viewByWindows.getUnchecked(windowedView); - - WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); - return future; + checkArgument(readerViews.contains(view), + "calling get(PCollectionView) with unknown view: " + view); + checkArgument(isReady(view, window), + "calling get(PCollectionView) with view %s that is not ready in window %s", + view, + window); + // Safe covariant cast + @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values = + (Iterable<WindowedValue<?>>) viewByWindows + .getUnchecked(PCollectionViewWindow.of(view, window)).get(); + return view.fromIterableInternal(values); } @Override @@ -240,31 +254,4 @@ class InProcessSideInputContainer { } } - private static class WriteEmptyViewContents implements Runnable { - private final PCollectionView<?> view; - private final BoundedWindow window; - private final SettableFuture<Iterable<? extends WindowedValue<?>>> future; - - private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window, - SettableFuture<Iterable<? extends WindowedValue<?>>> future) { - this.future = future; - this.view = view; - this.window = window; - } - - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.<WindowedValue<?>>emptyList()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("view", view) - .add("window", window) - .toString(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index 8f89e70..2f376dd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import org.apache.beam.sdk.coders.KvCoder; @@ -61,8 +62,10 @@ import org.mockito.stubbing.Answer; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Tests for {@link InProcessSideInputContainer}. @@ -194,46 +197,12 @@ public class InProcessSideInputContainerTest { * there is data in the pane. */ @Test - public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - Future<Double> singletonFuture = - getFutureOfView( - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)), - singletonView, - window); - - WindowedValue<Double> singletonValue = - WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(4.75)); - } - - @Test - public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - SideInputReader newReader = - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)); - Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window); - - WindowedValue<Double> singletonValue = - WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + public void getNotReadyThrows() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("not ready"); - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(24.125)); + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(mapView, GlobalWindow.INSTANCE); } @Test @@ -448,15 +417,20 @@ public class InProcessSideInputContainerTest { } @Test - public void isReadyForEmptyWindowTrue() { + public void isReadyForEmptyWindowTrue() throws Exception { + CountDownLatch onComplete = new CountDownLatch(1); immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); + CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete); ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); + latch.countDown(); + if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { + fail("Callback to set empty values did not complete!"); + } assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); } @@ -483,6 +457,45 @@ public class InProcessSideInputContainerTest { Mockito.any(Runnable.class)); } + /** + * When a callAfterWindowCloses with the specified view's producing transform, window, and + * windowing strategy is invoked, start a thread that will invoke the callback after the returned + * {@link CountDownLatch} is counted down once. + */ + private CountDownLatch invokeLatchedCallback( + PCollectionView<?> view, BoundedWindow window, final CountDownLatch onComplete) { + final CountDownLatch runLatch = new CountDownLatch(1); + doAnswer( + new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + final Runnable callbackRunnable = (Runnable) callback; + Executors.newSingleThreadExecutor().submit(new Runnable() { + public void run() { + try { + if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) { + fail("Run latch didn't count down within timeout"); + } + callbackRunnable.run(); + onComplete.countDown(); + } catch (InterruptedException e) { + fail("Unexpectedly interrupted while waiting for latch to be counted down"); + } + } + }); + return null; + } + }) + .when(context) + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); + return runLatch; + } + private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader, final PCollectionView<ValueT> view, final BoundedWindow window) { Callable<ValueT> callable = new Callable<ValueT>() {
