http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
deleted file mode 100644
index ad70bca..0000000
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CopyOnAccessInMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class CopyOnAccessInMemoryStateInternalsTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
- private String key = "foo";
- @Test
- public void testGetWithEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = internals.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
- assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
- BagState<String> reReadStringBag = internals.state(namespace, bagTag);
- assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
- }
-
- @Test
- public void testGetWithAbsentInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = internals.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
- assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
- BagState<String> reReadVoidBag = internals.state(namespace, bagTag);
- assertThat(reReadVoidBag.read(), containsInAnyOrder("baz", "bar"));
-
- BagState<String> underlyingState = underlying.state(namespace, bagTag);
- assertThat(underlyingState.read(), emptyIterable());
- }
-
- /**
- * Tests that retrieving state with an underlying StateInternals with an
existing value returns
- * a value that initially has equal value to the provided state but can be
modified without
- * modifying the existing state.
- */
- @Test
- public void testGetWithPresentInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo",
StringUtf8Coder.of());
- ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
- assertThat(underlyingValue.read(), nullValue(String.class));
-
- underlyingValue.write("bar");
- assertThat(underlyingValue.read(), equalTo("bar"));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- ValueState<String> copyOnAccessState = internals.state(namespace,
valueTag);
- assertThat(copyOnAccessState.read(), equalTo("bar"));
-
- copyOnAccessState.write("baz");
- assertThat(copyOnAccessState.read(), equalTo("baz"));
- assertThat(underlyingValue.read(), equalTo("bar"));
-
- ValueState<String> reReadUnderlyingValue = underlying.state(namespace,
valueTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
- public void testBagStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo",
VarIntCoder.of());
- BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
- assertThat(underlyingValue.read(), emptyIterable());
-
- underlyingValue.add(1);
- assertThat(underlyingValue.read(), containsInAnyOrder(1));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
- assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
-
- copyOnAccessState.add(4);
- assertThat(copyOnAccessState.read(), containsInAnyOrder(4, 1));
- assertThat(underlyingValue.read(), containsInAnyOrder(1));
-
- BagState<Integer> reReadUnderlyingValue = underlying.state(namespace,
valueTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
- public void testAccumulatorCombiningStateWithUnderlying() throws
CannotProvideCoderException {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = TestPipeline.create().getCoderRegistry();
- StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
- StateTags.combiningValue("summer",
- sumLongFn.getAccumulatorCoder(reg,
reg.getDefaultCoder(Long.class)), sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace,
stateTag);
- assertThat(underlyingValue.read(), equalTo(0L));
-
- underlyingValue.add(1L);
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace,
stateTag);
- assertThat(copyOnAccessState.read(), equalTo(1L));
-
- copyOnAccessState.add(4L);
- assertThat(copyOnAccessState.read(), equalTo(5L));
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CombiningState<Long, Long> reReadUnderlyingValue =
underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
- public void testKeyedAccumulatorCombiningStateWithUnderlying() throws
Exception {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- KeyedCombineFn<String, Long, long[], Long> sumLongFn = new
Sum.SumLongFn().asKeyedFn();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = TestPipeline.create().getCoderRegistry();
- StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
- StateTags.keyedCombiningValue(
- "summer",
- sumLongFn.getAccumulatorCoder(
- reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
- sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace,
stateTag);
- assertThat(underlyingValue.read(), equalTo(0L));
-
- underlyingValue.add(1L);
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace,
stateTag);
- assertThat(copyOnAccessState.read(), equalTo(1L));
-
- copyOnAccessState.add(4L);
- assertThat(copyOnAccessState.read(), equalTo(5L));
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CombiningState<Long, Long> reReadUnderlyingValue =
underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
- public void testWatermarkHoldStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- OutputTimeFn<BoundedWindow> outputTimeFn =
- OutputTimeFns.outputAtEarliestInputTimestamp();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
- StateTags.watermarkStateInternal("wmstate", outputTimeFn);
- WatermarkHoldState<?> underlyingValue = underlying.state(namespace,
stateTag);
- assertThat(underlyingValue.read(), nullValue());
-
- underlyingValue.add(new Instant(250L));
- assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- WatermarkHoldState<BoundedWindow> copyOnAccessState =
internals.state(namespace, stateTag);
- assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
-
- copyOnAccessState.add(new Instant(100L));
- assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
- assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
-
- copyOnAccessState.add(new Instant(500L));
- assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
-
- WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
- underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
- public void testCommitWithoutUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = internals.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
- assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
- internals.commit();
-
- BagState<String> reReadStringBag = internals.state(namespace, bagTag);
- assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
- assertThat(internals.isEmpty(), is(false));
- }
-
- @Test
- public void testCommitWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = underlying.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
-
- internals.commit();
- BagState<String> reReadStringBag = internals.state(namespace, bagTag);
- assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
-
- reReadStringBag.add("spam");
-
- BagState<String> underlyingState = underlying.state(namespace, bagTag);
- assertThat(underlyingState.read(), containsInAnyOrder("spam", "bar",
"baz"));
- assertThat(underlyingState, is(theInstance(stringBag)));
- assertThat(internals.isEmpty(), is(false));
- }
-
- @Test
- public void testCommitWithClearedInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
- spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key,
underlying));
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key,
secondUnderlying);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = underlying.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
- stringBag.clear();
- // We should not read through the cleared bag
- secondUnderlying.commit();
-
- // Should not be visible
- stringBag.add("foo");
-
- internals.commit();
- BagState<String> internalsStringBag = internals.state(namespace, bagTag);
- assertThat(internalsStringBag.read(), emptyIterable());
- verify(secondUnderlying, never()).state(namespace, bagTag);
- assertThat(internals.isEmpty(), is(false));
- }
-
- @Test
- public void testCommitWithOverwrittenUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = underlying.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
-
- BagState<String> internalsState = internals.state(namespace, bagTag);
- internalsState.add("eggs");
- internalsState.add("ham");
- internalsState.add("0x00ff00");
- internalsState.add("&");
-
- internals.commit();
-
- BagState<String> reReadInternalState = internals.state(namespace, bagTag);
- assertThat(
- reReadInternalState.read(),
- containsInAnyOrder("bar", "baz", "0x00ff00", "eggs", "&", "ham"));
- BagState<String> reReadUnderlyingState = underlying.state(namespace,
bagTag);
- assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
- }
-
- @Test
- public void testCommitWithAddedUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
- internals.commit();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = underlying.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
-
- BagState<String> internalState = internals.state(namespace, bagTag);
- assertThat(internalState.read(), emptyIterable());
-
- BagState<String> reReadUnderlyingState = underlying.state(namespace,
bagTag);
- assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
- }
-
- @Test
- public void testCommitWithEmptyTableIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- internals.commit();
-
- assertThat(internals.isEmpty(), is(true));
- }
-
- @Test
- public void testCommitWithOnlyClearedValuesIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = internals.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("foo");
- stringBag.clear();
-
- internals.commit();
-
- assertThat(internals.isEmpty(), is(true));
- }
-
- @Test
- public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo",
StringUtf8Coder.of());
- BagState<String> stringBag = underlying.state(namespace, bagTag);
- assertThat(stringBag.read(), emptyIterable());
-
- stringBag.add("bar");
- stringBag.add("baz");
-
- internals.commit();
- assertThat(internals.isEmpty(), is(false));
- }
-
- @Test
- public void testGetEarliestWatermarkHoldAfterCommit() {
- BoundedWindow first = new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(2048L);
- }
- };
- BoundedWindow second = new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(689743L);
- }
- };
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
- internals.state(StateNamespaces.window(null, first), firstHoldAddress);
- firstHold.add(new Instant(22L));
-
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
- internals.state(StateNamespaces.window(null, second),
secondHoldAddress);
- secondHold.add(new Instant(2L));
-
- internals.commit();
- assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(2L)));
- }
-
- @Test
- public void testGetEarliestWatermarkHoldWithEarliestInUnderlyingTable() {
- BoundedWindow first = new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(2048L);
- }
- };
- BoundedWindow second = new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(689743L);
- }
- };
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
- underlying.state(StateNamespaces.window(null, first),
firstHoldAddress);
- firstHold.add(new Instant(22L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying("foo",
underlying.commit());
-
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
- internals.state(StateNamespaces.window(null, second),
secondHoldAddress);
- secondHold.add(new Instant(244L));
-
- internals.commit();
- assertThat(internals.getEarliestWatermarkHold(), equalTo(new
Instant(22L)));
- }
-
- @Test
- public void testGetEarliestWatermarkHoldWithEarliestInNewTable() {
- BoundedWindow first =
- new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(2048L);
- }
- };
- BoundedWindow second =
- new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(689743L);
- }
- };
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
- underlying.state(StateNamespaces.window(null, first),
firstHoldAddress);
- firstHold.add(new Instant(224L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying("foo",
underlying.commit());
-
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
- internals.state(StateNamespaces.window(null, second),
secondHoldAddress);
- secondHold.add(new Instant(24L));
-
- internals.commit();
- assertThat(internals.getEarliestWatermarkHold(), equalTo(new
Instant(24L)));
- }
-
- @Test
- public void testGetEarliestHoldBeforeCommit() {
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
- internals
- .state(
- StateNamespaces.global(),
- StateTags.watermarkStateInternal("foo",
OutputTimeFns.outputAtEarliestInputTimestamp()))
- .add(new Instant(1234L));
-
- thrown.expect(IllegalStateException.class);
-
thrown.expectMessage(CopyOnAccessInMemoryStateInternals.class.getSimpleName());
- thrown.expectMessage("Can't get the earliest watermark hold");
- thrown.expectMessage("before it is committed");
-
- internals.getEarliestWatermarkHold();
- }
-}