http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index 9552325..f0eb53e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.Preconditions; import java.io.ByteArrayOutputStream; @@ -39,8 +40,8 @@ import java.util.Map; * @param <V> The type of the value. */ public class HeapListState<K, N, V> - extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> - implements ListState<V> { + extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> + implements InternalListState<N, V> { /** * Creates a new key/value state for the given hash map of key/value pairs. @@ -59,6 +60,10 @@ public class HeapListState<K, N, V> super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); } + // ------------------------------------------------------------------------ + // state access + // ------------------------------------------------------------------------ + @Override public Iterable<V> get() { Preconditions.checkState(currentNamespace != null, "No namespace set."); @@ -154,4 +159,14 @@ public class HeapListState<K, N, V> return baos.toByteArray(); } + + // ------------------------------------------------------------------------ + // state merging + // ------------------------------------------------------------------------ + + @Override + protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) { + a.addAll(b); + return a; + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java index 37aa812..7804cb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -37,8 +38,8 @@ import java.util.Map; * @param <V> The type of the value. */ public class HeapReducingState<K, N, V> - extends AbstractHeapState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> - implements ReducingState<V> { + extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>> + implements InternalReducingState<N, V> { private final ReduceFunction<V> reduceFunction; @@ -56,10 +57,15 @@ public class HeapReducingState<K, N, V> StateTable<K, N, V> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); this.reduceFunction = stateDesc.getReduceFunction(); } + // ------------------------------------------------------------------------ + // state access + // ------------------------------------------------------------------------ + @Override public V get() { Preconditions.checkState(currentNamespace != null, "No namespace set."); @@ -111,13 +117,22 @@ public class HeapReducingState<K, N, V> if (currentValue == null) { // we're good, just added the new value } else { - V reducedValue = null; + V reducedValue; try { reducedValue = reduceFunction.reduce(currentValue, value); } catch (Exception e) { - throw new RuntimeException("Could not add value to reducing state.", e); + throw new IOException("Exception while applying ReduceFunction in reducing state", e); } keyedMap.put(backend.<K>getCurrentKey(), reducedValue); } } + + // ------------------------------------------------------------------------ + // state merging + // ------------------------------------------------------------------------ + + @Override + protected V mergeState(V a, V b) throws Exception { + return reduceFunction.reduce(a, b); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java index cccaacb..9e042fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.util.Preconditions; import java.util.Map; @@ -36,7 +37,7 @@ import java.util.Map; */ public class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>> - implements ValueState<V> { + implements InternalValueState<N, V> { /** * Creates a new key/value state for the given hash map of key/value pairs. http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java index 797150a..9d7232e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -17,6 +17,7 @@ */ package org.apache.flink.runtime.state.heap; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.KeyGroupRange; @@ -78,4 +79,21 @@ public class StateTable<K, N, ST> { public List<Map<N, Map<K, ST>>> getState() { return state; } + + // ------------------------------------------------------------------------ + // for testing + // ------------------------------------------------------------------------ + + @VisibleForTesting + boolean isEmpty() { + for (Map<N, Map<K, ST>> map : state) { + if (map != null) { + if (!map.isEmpty()) { + return false; + } + } + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java new file mode 100644 index 0000000..ae9f457 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.AppendingState; + +/** + * The peer to the {@link AppendingState} in the internal state type hierarchy. + * + * <p>See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <IN> The type of elements added to the state + * @param <OUT> The type of the + */ +public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java new file mode 100644 index 0000000..eb58ce5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.FoldingState; + +/** + * The peer to the {@link FoldingState} in the internal state type hierarchy. + * + * <p>See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <T> Type of the values folded into the state + * @param <ACC> Type of the value in the state + */ +public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java new file mode 100644 index 0000000..06f64b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.State; + +/** + * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the + * {@link State} being the root of the public API state hierarchy. + * + * <p>The internal state classes give access to the namespace getters and setters and access to + * additional functionality, like raw value access or state merging. + * + * <p>The public API state hierarchy is intended to be programmed against by Flink applications. + * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not + * intended to be used by user applications. These internal methods are considered of limited use to users and + * only confusing, and are usually not regarded as stable across releases. + * + * <p>Each specific type in the internal state hierarchy extends the type from the public + * state hierarchy: + * + * <pre> + * State + * | + * +-------------------InternalKvState + * | | + * MergingState | + * | | + * +-----------------InternalMergingState + * | | + * +--------+------+ | + * | | | + * ReducingState ListState +-----+-----------------+ + * | | | | + * +-----------+ +----------- -----------------InternalListState + * | | + * +---------InternalReducingState + * </pre> + * + * @param <N> The type of the namespace. + */ +public interface InternalKvState<N> extends State { + + /** + * Sets the current namespace, which will be used when using the state access methods. + * + * @param namespace The namespace. + */ + void setCurrentNamespace(N namespace); + + /** + * Returns the serialized value for the given key and namespace. + * + * <p>If no value is associated with key and namespace, <code>null</code> + * is returned. + * + * @param serializedKeyAndNamespace Serialized key and namespace + * @return Serialized value or <code>null</code> if no value is associated with the key and namespace. + * + * @throws Exception Exceptions during serialization are forwarded + */ + byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java new file mode 100644 index 0000000..ae392ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.ListState; + +/** + * The peer to the {@link ListState} in the internal state type hierarchy. + * + * <p>See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <T> The type of elements in the list + */ +public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java new file mode 100644 index 0000000..abc7d7c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.MergingState; + +import java.util.Collection; + +/** + * The peer to the {@link MergingState} in the internal state type hierarchy. + * + * See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <IN> The type of elements added to the state + * @param <OUT> The type of elements + */ +public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> { + + /** + * Merges the state of the current key for the given source namespaces into the state of + * the target namespace. + * + * @param target The target namespace where the merged state should be stored. + * @param sources The source namespaces whose state should be merged. + * + * @throws Exception The method may forward exception thrown internally (by I/O or functions). + */ + void mergeNamespaces(N target, Collection<N> sources) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java new file mode 100644 index 0000000..40e625c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.ReducingState; + +/** + * The peer to the {@link ReducingState} in the internal state type hierarchy. + * + * <p>See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <T> The type of elements in the list + */ +public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java new file mode 100644 index 0000000..7177b8a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.internal; + +import org.apache.flink.api.common.state.ValueState; + +/** + * The peer to the {@link ValueState} in the internal state type hierarchy. + * + * <p>See {@link InternalKvState} for a description of the internal state hierarchy. + * + * @param <N> The type of the namespace + * @param <T> The type of elements in the list + */ +public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index 86f8766..1c02ca1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -43,8 +43,8 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -593,7 +593,7 @@ public class KvStateClientTest { state.update(201 + i); // we know it must be a KvStat but this is not exposed to the user via State - KvState<?> kvState = (KvState<?>) state; + InternalKvState<?> kvState = (InternalKvState<?>) state; // Register KvState (one state instance for all server) ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index b1ec86f..202024c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -40,9 +40,9 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -267,7 +267,7 @@ public class KvStateServerHandlerTest extends TestLogger { } /** - * Tests the failure response on a failure on the {@link KvState#getSerializedValue(byte[])} + * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} * call. */ @Test @@ -279,7 +279,7 @@ public class KvStateServerHandlerTest extends TestLogger { EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Failing KvState - KvState<?> kvState = mock(KvState.class); + InternalKvState<?> kvState = mock(InternalKvState.class); when(kvState.getSerializedValue(any(byte[].class))) .thenThrow(new RuntimeException("Expected test Exception")); http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index 0d9c2e4..69dbe6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.query.netty.message; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; -import org.apache.flink.api.common.state.ListState; + import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -30,10 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalListState; + import org.junit.Test; import java.io.IOException; @@ -315,7 +317,7 @@ public class KvStateRequestSerializerTest { */ @Test public void testListSerialization() throws Exception { - final long key = 0l; + final long key = 0L; // objects for heap state list serialisation final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = @@ -327,9 +329,10 @@ public class KvStateRequestSerializerTest { ); longHeapKeyedStateBackend.setCurrentKey(key); - final ListState<Long> listState = longHeapKeyedStateBackend - .createListState(VoidNamespaceSerializer.INSTANCE, + final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState( + VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + testListSerialization(key, listState); } @@ -340,19 +343,16 @@ public class KvStateRequestSerializerTest { * @param key * key of the list state * @param listState - * list state using the {@link VoidNamespace}, must also be a {@link - * KvState} instance + * list state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance * * @throws Exception */ - public static void testListSerialization(final long key, - final ListState<Long> listState) throws Exception { + public static void testListSerialization( + final long key, + final InternalListState<VoidNamespace, Long> listState) throws Exception { TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; - - final KvState<VoidNamespace> listKvState = - (KvState<VoidNamespace>) listState; - listKvState.setCurrentNamespace(VoidNamespace.INSTANCE); + listState.setCurrentNamespace(VoidNamespace.INSTANCE); // List final int numElements = 10; @@ -368,8 +368,8 @@ public class KvStateRequestSerializerTest { KvStateRequestSerializer.serializeKeyAndNamespace( key, LongSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - final byte[] serializedValues = - listKvState.getSerializedValue(serializedKey); + + final byte[] serializedValues = listState.getSerializedValue(serializedKey); List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer); assertEquals(expectedValues, actualValues); http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 38e04aa..c560ab0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.query.KvStateRegistryListener; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.heap.AbstractHeapState; import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -167,7 +168,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; // some modifications to the state backend.setCurrentKey(1); @@ -214,7 +215,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1; + InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1; backend.setCurrentKey(1); assertEquals("1", restored1.value()); @@ -230,7 +231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2; + InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2; backend.setCurrentKey(1); assertEquals("u1", restored2.value()); @@ -246,7 +247,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } /** - * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} + * Tests {@link ValueState#value()} and {@link InternalKvState#getSerializedValue(byte[])} * accessing the state concurrently. They should not get in the way of each * other. */ @@ -255,7 +256,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten public void testValueStateRace() throws Exception { final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); - final Integer namespace = Integer.valueOf(1); + final Integer namespace = 1; final ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); @@ -270,7 +271,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - final KvState<Integer> kvState = (KvState<Integer>) state; + final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state; /** * 1) Test that ValueState#value() before and after @@ -496,7 +497,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; Joiner joiner = Joiner.on(","); // some modifications to the state @@ -544,7 +545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1; + InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1; backend.setCurrentKey(1); assertEquals("1", joiner.join(restored1.get())); @@ -560,7 +561,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2; + InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2; backend.setCurrentKey(1); assertEquals("1,u1", joiner.join(restored2.get())); @@ -596,7 +597,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; // some modifications to the state backend.setCurrentKey(1); @@ -643,7 +644,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1; + InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1; backend.setCurrentKey(1); assertEquals("1", restored1.get()); @@ -659,7 +660,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2; + InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2; backend.setCurrentKey(1); assertEquals("1,u1", restored2.get()); @@ -698,7 +699,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; // some modifications to the state backend.setCurrentKey(1); @@ -746,7 +747,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1; + InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1; backend.setCurrentKey(1); assertEquals("Fold-Initial:,1", restored1.get()); @@ -763,7 +764,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2; + InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2; backend.setCurrentKey(1); assertEquals("Fold-Initial:,101", restored2.get()); @@ -1254,7 +1255,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten VoidNamespaceSerializer.INSTANCE, desc); - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; assertTrue(kvState instanceof AbstractHeapState); kvState.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -1280,7 +1281,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten VoidNamespaceSerializer.INSTANCE, desc); - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; assertTrue(kvState instanceof AbstractHeapState); kvState.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -1311,7 +1312,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten VoidNamespaceSerializer.INSTANCE, desc); - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; assertTrue(kvState instanceof AbstractHeapState); kvState.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -1342,7 +1343,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten VoidNamespaceSerializer.INSTANCE, desc); - KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state; + InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state; assertTrue(kvState instanceof AbstractHeapState); kvState.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -1451,7 +1452,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten * if it is not null. */ private static <V, K, N> V getSerializedValue( - KvState<N> kvState, + InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, N namespace, @@ -1475,7 +1476,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten * if it is not null. */ private static <V, K, N> List<V> getSerializedList( - KvState<N> kvState, + InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, N namespace, http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java new file mode 100644 index 0000000..33d60a0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests for the simple Java heap objects implementation of the {@link ListState}. + */ +public class HeapListStateTest { + + @Test + public void testAddAndGet() throws Exception { + + final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(); + + try { + InternalListState<VoidNamespace, Long> state = + keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + state.add(17L); + state.add(11L); + assertEquals(asList(17L, 11L), state.get()); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertNull(state.get()); + state.add(1L); + state.add(2L); + + keyedBackend.setCurrentKey("def"); + assertEquals(asList(17L, 11L), state.get()); + state.clear(); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + state.add(3L); + state.add(2L); + state.add(1L); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get()); + state.clear(); + + // make sure all lists / maps are cleared + + StateTable<String, VoidNamespace, ArrayList<Long>> stateTable = + ((HeapListState<String, VoidNamespace, Long>) state).stateTable; + + assertTrue(stateTable.isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + @Test + public void testMerging() throws Exception { + + final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L)); + + final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(); + + try { + InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr); + + // populate the different namespaces + // - abc spreads the values over three namespaces + // - def spreads teh values over two namespaces (one empty) + // - ghi is empty + // - jkl has all elements already in the target namespace + // - mno has all elements already in one source namespace + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + state.add(33L); + state.add(55L); + + state.setCurrentNamespace(namespace2); + state.add(22L); + state.add(11L); + + state.setCurrentNamespace(namespace3); + state.add(44L); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + state.add(11L); + state.add(44L); + + state.setCurrentNamespace(namespace3); + state.add(22L); + state.add(55L); + state.add(33L); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("abc"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("def"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("ghi"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertNull(state.get()); + + keyedBackend.setCurrentKey("jkl"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("mno"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + // make sure all lists / maps are cleared + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("ghi"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace1); + state.clear(); + + StateTable<String, Integer, ArrayList<Long>> stateTable = + ((HeapListState<String, Integer, Long>) state).stateTable; + + assertTrue(stateTable.isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { + return new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapListStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15)); + } + + private static <T> void validateResult(Iterable<T> values, Set<T> expected) { + int num = 0; + for (T v : values) { + num++; + assertTrue(expected.contains(v)); + } + + assertEquals(expected.size(), num); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java new file mode 100644 index 0000000..e0929f1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests for the simple Java heap objects implementation of the {@link ReducingState}. + */ +public class HeapReducingStateTest { + + @Test + public void testAddAndGet() throws Exception { + + final ReducingStateDescriptor<Long> stateDescr = + new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(); + + try { + InternalReducingState<VoidNamespace, Long> state = + keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + state.add(17L); + state.add(11L); + assertEquals(28L, state.get().longValue()); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertNull(state.get()); + state.add(1L); + state.add(2L); + + keyedBackend.setCurrentKey("def"); + assertEquals(28L, state.get().longValue()); + state.clear(); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + state.add(3L); + state.add(2L); + state.add(1L); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertEquals(9L, state.get().longValue()); + state.clear(); + + // make sure all lists / maps are cleared + + StateTable<String, VoidNamespace, Long> stateTable = + ((HeapReducingState<String, VoidNamespace, Long>) state).stateTable; + + assertTrue(stateTable.isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + @Test + public void testMerging() throws Exception { + + final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>( + "my-state", new AddingFunction(), Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + final Long expectedResult = 165L; + + final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(); + + try { + final InternalReducingState<Integer, Long> state = + keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr); + + // populate the different namespaces + // - abc spreads the values over three namespaces + // - def spreads teh values over two namespaces (one empty) + // - ghi is empty + // - jkl has all elements already in the target namespace + // - mno has all elements already in one source namespace + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + state.add(33L); + state.add(55L); + + state.setCurrentNamespace(namespace2); + state.add(22L); + state.add(11L); + + state.setCurrentNamespace(namespace3); + state.add(44L); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + state.add(11L); + state.add(44L); + + state.setCurrentNamespace(namespace3); + state.add(22L); + state.add(55L); + state.add(33L); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("abc"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, state.get()); + + keyedBackend.setCurrentKey("def"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, state.get()); + + keyedBackend.setCurrentKey("ghi"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertNull(state.get()); + + keyedBackend.setCurrentKey("jkl"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, state.get()); + + keyedBackend.setCurrentKey("mno"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, state.get()); + + // make sure all lists / maps are cleared + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("ghi"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace1); + state.clear(); + + StateTable<String, Integer, Long> stateTable = + ((HeapReducingState<String, Integer, Long>) state).stateTable; + + assertTrue(stateTable.isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { + return new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15)); + } + + // ------------------------------------------------------------------------ + // test functions + // ------------------------------------------------------------------------ + + @SuppressWarnings("serial") + private static class AddingFunction implements ReduceFunction<Long> { + + @Override + public Long reduce(Long a, Long b) { + return a + b; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a21660c..6bb0a40 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -494,15 +494,34 @@ public abstract class AbstractStreamOperator<OUT> return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); } + + protected <N, S extends State, T> S getOrCreateKeyedState( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, T> stateDescriptor) throws Exception { + + if (keyedStateStore != null) { + return keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor); + } + else { + throw new IllegalStateException("Cannot create partitioned state. " + + "The keyed state backend has not been set." + + "This indicates that the operator is not partitioned/keyed."); + } + } + /** * Creates a partitioned state handle, using the state backend configured for this task. + * + * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. + * This method should be removed for the sake of namespaces being lazily fetched from the keyed + * state backend, or being set on the state directly. * * @throws IllegalStateException Thrown, if the key/value state was already initialized. * @throws Exception Thrown, if the state backend cannot create the key/value state. */ - @SuppressWarnings("unchecked") protected <S extends State, N> S getPartitionedState( - N namespace, TypeSerializer<N> namespaceSerializer, + N namespace, + TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception { if (keyedStateStore != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 0d5d091..05c89dd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -35,7 +35,7 @@ import java.util.Set; /** * A {@link Window} that represents a time interval from {@code start} (inclusive) to - * {@code start + size} (exclusive). + * {@code end} (exclusive). */ @PublicEvolving public class TimeWindow extends Window { @@ -48,14 +48,35 @@ public class TimeWindow extends Window { this.end = end; } + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs + * to this window. + * + * @return The starting timestamp of this window. + */ public long getStart() { return start; } + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it + * is the first timestamp that does not belong to this window any more. + * + * @return The exclusive end timestamp of this window. + */ public long getEnd() { return end; } + /** + * Gets the largest timestamp that still belongs to this window. + * + * <p>This timestamp is identical to {@code getEnd() - 1}. + * + * @return The largest timestamp that still belongs to this window. + * + * @see #getEnd() + */ @Override public long maxTimestamp() { return end - 1; @@ -104,6 +125,13 @@ public class TimeWindow extends Window { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); } + // ------------------------------------------------------------------------ + // Serializer + // ------------------------------------------------------------------------ + + /** + * The serializer used to write the TimeWindow type. + */ public static class Serializer extends TypeSerializer<TimeWindow> { private static final long serialVersionUID = 1L; @@ -152,9 +180,7 @@ public class TimeWindow extends Window { @Override public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException { - long start = source.readLong(); - long end = source.readLong(); - return new TimeWindow(start, end); + return deserialize(source); } @Override @@ -179,6 +205,10 @@ public class TimeWindow extends Window { } } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + /** * Merge overlapping {@link TimeWindow}s. For use by merging * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}. http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java index 0e131ff..b17989c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java @@ -31,5 +31,10 @@ import org.apache.flink.annotation.PublicEvolving; @PublicEvolving public abstract class Window { + /** + * Gets the largest timestamp that still belongs to this window. + * + * @return The largest timestamp that still belongs to this window. + */ public abstract long maxTimestamp(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 8c73878..d9c977a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; + import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.AppendingState; -import org.apache.flink.api.common.state.MergingState; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -41,7 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; -import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * A {@link WindowOperator} that also allows an {@link Evictor} to be used. @@ -56,40 +57,52 @@ import static java.util.Objects.requireNonNull; * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal -public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> { +public class EvictingWindowOperator<K, IN, OUT, W extends Window> + extends WindowOperator<K, IN, Iterable<IN>, OUT, W> { private static final long serialVersionUID = 1L; + // ------------------------------------------------------------------------ + // these fields are set by the API stream graph builder to configure the operator + private final Evictor<? super IN, ? super W> evictor; + private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor; + + // ------------------------------------------------------------------------ + // the fields below are instantiated once the operator runs in the runtime + private transient EvictorContext evictorContext; - private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor; + private transient InternalListState<W, StreamRecord<IN>> evictingWindowState; + + // ------------------------------------------------------------------------ public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, - TypeSerializer<W> windowSerializer, - KeySelector<IN, K> keySelector, - TypeSerializer<K> keySerializer, - StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, - InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, - Trigger<? super IN, ? super W> trigger, - Evictor<? super IN, ? super W> evictor, - long allowedLateness) { + TypeSerializer<W> windowSerializer, + KeySelector<IN, K> keySelector, + TypeSerializer<K> keySerializer, + StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, + InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, + Trigger<? super IN, ? super W> trigger, + Evictor<? super IN, ? super W> evictor, + long allowedLateness) { super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness); - this.evictor = requireNonNull(evictor); - this.windowStateDescriptor = windowStateDescriptor; + + this.evictor = checkNotNull(evictor); + this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor); } @Override - @SuppressWarnings("unchecked") public void processElement(StreamRecord<IN> element) throws Exception { Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); + @SuppressWarnings("unchecked") final K key = (K) getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { @@ -119,11 +132,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } // merge the merged state windows into the newly resulting state window - getKeyedStateBackend().mergePartitionedStates( - stateWindowResult, - mergedStateWindows, - windowSerializer, - (StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor); + evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); @@ -137,9 +146,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } - ListState<StreamRecord<IN>> windowState = getPartitionedState( - stateWindow, windowSerializer, windowStateDescriptor); - windowState.add(element); + + evictingWindowState.setCurrentNamespace(stateWindow); + evictingWindowState.add(element); context.key = key; context.window = actualWindow; @@ -149,16 +158,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onElement(element); if (triggerResult.isFire()) { - Iterable<StreamRecord<IN>> contents = windowState.get(); + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } - fire(actualWindow, contents, windowState); + fire(actualWindow, contents, evictingWindowState); } if (triggerResult.isPurge()) { - cleanup(actualWindow, windowState, mergingWindows); + cleanup(actualWindow, evictingWindowState, mergingWindows); } else { registerCleanupTimer(actualWindow); } @@ -173,9 +182,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window continue; } - ListState<StreamRecord<IN>> windowState = getPartitionedState( - window, windowSerializer, windowStateDescriptor); - windowState.add(element); + evictingWindowState.setCurrentNamespace(window); + evictingWindowState.add(element); context.key = key; context.window = window; @@ -185,16 +193,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onElement(element); if (triggerResult.isFire()) { - Iterable<StreamRecord<IN>> contents = windowState.get(); + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } - fire(window, contents, windowState); + fire(window, contents, evictingWindowState); } if (triggerResult.isPurge()) { - cleanup(window, windowState, null); + cleanup(window, evictingWindowState, null); } else { registerCleanupTimer(window); } @@ -222,15 +230,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + + evictingWindowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState( - context.window, - windowSerializer, - windowStateDescriptor); + evictingWindowState.setCurrentNamespace(context.window); } - Iterable<StreamRecord<IN>> contents = windowState.get(); + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents == null) { // if we have no state, there is nothing to do return; @@ -238,11 +244,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { - fire(context.window, contents, windowState); + fire(context.window, contents, evictingWindowState); } if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, windowState, mergingWindows); + cleanup(context.window, evictingWindowState, mergingWindows); } } @@ -265,12 +271,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + evictingWindowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + evictingWindowState.setCurrentNamespace(context.window); } - Iterable<StreamRecord<IN>> contents = windowState.get(); + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents == null) { // if we have no state, there is nothing to do return; @@ -278,11 +284,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { - fire(context.window, contents, windowState); + fire(context.window, contents, evictingWindowState); } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, windowState, mergingWindows); + cleanup(context.window, evictingWindowState, mergingWindows); } } @@ -381,7 +387,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override public void open() throws Exception { super.open(); + evictorContext = new EvictorContext(null,null); + evictingWindowState = (InternalListState<W, StreamRecord<IN>>) + getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor); } @Override @@ -409,6 +418,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @VisibleForTesting @SuppressWarnings("unchecked, rawtypes") public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() { - return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) windowStateDescriptor; + return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) evictingWindowStateDescriptor; } }