http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
----------------------------------------------------------------------
diff --git
a/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
new file mode 100644
index 0000000..f11299e
--- /dev/null
+++
b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.util
+import java.util.{Iterator, Map}
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.StringUtf8Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, SetState,
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{StateNamespaces,
StateTags}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class HeapStateInternalsSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("HeapStateInternalsProxy should return correct key coder") {
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory: HeapStateInternalsFactory[String] =
+ new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+ val proxy: HeapStateInternalsProxy[String] = new
HeapStateInternalsProxy[String](factory)
+
+ factory.getKeyCoder shouldBe StringUtf8Coder.of
+ }
+
+ // region value state
+ property("test heap value state: write heap state should equals read state")
{
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+ valueState.read shouldBe value
+ }
+
+ property("test heap value state: write heap state should not equals read
state " +
+ "for different state id") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+
+ val newValueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(newStateId, StringUtf8Coder.of))
+ newValueState.read shouldNot be(value)
+ }
+
+ property("test heap value state: write heap state should equals read state "
+
+ "for different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+ val newValueState = newStateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+ newValueState.read shouldNot be(value)
+ }
+ // endregion
+
+ // region bag state
+ property("test heap Bag state: write heap state should equals read state") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val bagIterator: Iterator[String] = bagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (bagIterator.hasNext) {
+ counter += 1
+ if (counter == 1) {
+ bagIterator.next() shouldBe bagValue1
+ }
+ if (counter == 2) {
+ bagIterator.next() shouldBe bagValue2
+ }
+ }
+
+ counter shouldBe 2
+ }
+
+ property("test heap Bag state: write heap state should not equal read state
with " +
+ "different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+ val newBagState = newStateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (newBagIterator.hasNext) {
+ counter += 1
+ newBagIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test heap Bag state: write heap state should not equal read state
" +
+ "with different stateId") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+ val newBagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(newStateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (newBagIterator.hasNext) {
+ counter += 1
+ newBagIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+ // endregion
+
+ // region set state
+ property("test heap set state, generic methods") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val setState = stateInternals.state[SetState[String]](namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.add(setValue2)
+
+ implicit var setStateIterator = setState.read.iterator()
+
+ implicit var counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 2
+
+ setState.addIfAbsent(setValue2).read shouldBe false
+
+ setStateIterator = setState.read.iterator()
+
+ counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 2
+
+ setState.contains(setValue1).read shouldBe true
+ setState.contains("setValue03").read shouldBe false
+
+ setState.isEmpty.read shouldBe false
+
+ setState.remove(setValue1)
+ setState.remove(setValue2)
+
+ setState.isEmpty.read shouldBe true
+ }
+
+ property("test heap set state, write state should not equal read state " +
+ "with different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val setState = stateInternals.state[SetState[String]](namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+ val newSetState = newStateInternals.state(namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.add(setValue2)
+
+ implicit val newSetStateIterator = newSetState.read.iterator()
+
+ var counter = 0
+ while (newSetStateIterator.hasNext) {
+ counter += 1
+ newSetStateIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test heap set state, write state shuold not equal read state " +
+ "with different state id") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+
+ val setState = stateInternals.state(namespace, StateTags.set(stateId,
StringUtf8Coder.of))
+ val newSetState = stateInternals.state(namespace,
+ StateTags.set(newStateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.addIfAbsent(setValue2)
+
+ implicit val setStateIterator = newSetState.read.iterator()
+
+ var counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+ // endregion
+
+ // region map state
+ property("test map state, generic methods") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ implicit var mapKeysIterator = mapState.keys.read.iterator()
+
+ mapState.putIfAbsent(mapStateKey1, mapStateValue2).read shouldBe
mapStateValue1
+
+ var counter = 0
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 1
+
+ counter = 0
+ implicit val mapValuesIterator = mapState.values.read.iterator()
+ while (mapValuesIterator.hasNext) {
+ counter += 1
+ mapValuesIterator.next()
+ }
+
+ counter shouldBe 1
+
+ counter = 0
+ implicit val mapEntriesIterator = mapState.entries.read.iterator()
+ while (mapEntriesIterator.hasNext) {
+ counter += 1
+ mapEntriesIterator.next()
+ }
+
+ counter shouldBe 1
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ mapState.get("test01").read shouldBe null
+
+ mapState.remove(mapStateKey1)
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 0
+
+ mapState.putIfAbsent(mapStateKey2, mapStateValue2)
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 1
+
+ mapState.clear
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test map state, write state should not equal read state " +
+ "with different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+ val newMapState = newStateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+ }
+
+ property("test map state, write state should not equal read state " +
+ "with different state id") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of,
map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+ val newMapState = stateInternals.state(namespace,
+ StateTags.map(newStateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+ }
+ // endregion
+
+}