http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
new file mode 100644
index 0000000..f11299e
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.util
+import java.util.{Iterator, Map}
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.StringUtf8Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, SetState, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{StateNamespaces, 
StateTags}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class HeapStateInternalsSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("HeapStateInternalsProxy should return correct key coder") {
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory: HeapStateInternalsFactory[String] =
+      new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+    val proxy: HeapStateInternalsProxy[String] = new 
HeapStateInternalsProxy[String](factory)
+
+    factory.getKeyCoder shouldBe StringUtf8Coder.of
+  }
+
+  // region value state
+  property("test heap value state: write heap state should equals read state") 
{
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val value = "hello world"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+    val stateInternals = factory.stateInternalsForKey(key)
+    val valueState = stateInternals.state[ValueState[String]](namespace,
+      StateTags.value(stateId, StringUtf8Coder.of))
+
+    valueState.write(value)
+    valueState.read shouldBe value
+  }
+
+  property("test heap value state: write heap state should not equals read 
state " +
+    "for different state id") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val newStateId = "02"
+    implicit val value = "hello world"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+    val stateInternals = factory.stateInternalsForKey(key)
+    val valueState = stateInternals.state[ValueState[String]](namespace,
+      StateTags.value(stateId, StringUtf8Coder.of))
+
+    valueState.write(value)
+
+    val newValueState = stateInternals.state[ValueState[String]](namespace,
+      StateTags.value(newStateId, StringUtf8Coder.of))
+    newValueState.read shouldNot be(value)
+  }
+
+  property("test heap value state: write heap state should equals read state " 
+
+    "for different key") {
+    implicit val key = "key"
+    implicit val newKey = "newKey"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val value = "hello world"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val valueState = stateInternals.state[ValueState[String]](namespace,
+      StateTags.value(stateId, StringUtf8Coder.of))
+
+    val newStateInternals = factory.stateInternalsForKey(newKey)
+    val newValueState = newStateInternals.state[ValueState[String]](namespace,
+      StateTags.value(stateId, StringUtf8Coder.of))
+
+    valueState.write(value)
+    newValueState.read shouldNot be(value)
+  }
+  // endregion
+
+  // region bag state
+  property("test heap Bag state: write heap state should equals read state") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val bagValue1 = "bagValue1"
+    implicit val bagValue2 = "bagValue2"
+    implicit val stateId = "01"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val bagState = stateInternals.state[BagState[String]](namespace,
+      StateTags.bag(stateId, StringUtf8Coder.of))
+
+    bagState.add(bagValue1)
+    bagState.add(bagValue2)
+
+    val bagIterator: Iterator[String] = bagState.read.iterator()
+
+    implicit var counter = 0
+
+    while (bagIterator.hasNext) {
+      counter += 1
+      if (counter == 1) {
+        bagIterator.next() shouldBe bagValue1
+      }
+      if (counter == 2) {
+        bagIterator.next() shouldBe bagValue2
+      }
+    }
+
+    counter shouldBe 2
+  }
+
+  property("test heap Bag state: write heap state should not equal read state 
with " +
+    "different key") {
+    implicit val key = "key"
+    implicit val newKey = "newKey"
+    implicit val namespace = StateNamespaces.global
+    implicit val bagValue1 = "bagValue1"
+    implicit val bagValue2 = "bagValue2"
+    implicit val stateId = "01"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val newStateInternals = factory.stateInternalsForKey(newKey)
+
+    val bagState = stateInternals.state[BagState[String]](namespace,
+      StateTags.bag(stateId, StringUtf8Coder.of))
+    val newBagState = newStateInternals.state[BagState[String]](namespace,
+      StateTags.bag(stateId, StringUtf8Coder.of))
+
+    bagState.add(bagValue1)
+    bagState.add(bagValue2)
+
+    val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+    implicit var counter = 0
+
+    while (newBagIterator.hasNext) {
+      counter += 1
+      newBagIterator.next()
+    }
+
+    counter shouldBe 0
+  }
+
+  property("test heap Bag state: write heap state should not equal read state 
" +
+    "with different stateId") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val bagValue1 = "bagValue1"
+    implicit val bagValue2 = "bagValue2"
+    implicit val stateId = "01"
+    implicit val newStateId = "02"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val bagState = stateInternals.state[BagState[String]](namespace,
+      StateTags.bag(stateId, StringUtf8Coder.of))
+    val newBagState = stateInternals.state[BagState[String]](namespace,
+      StateTags.bag(newStateId, StringUtf8Coder.of))
+
+    bagState.add(bagValue1)
+    bagState.add(bagValue2)
+
+    val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+    implicit var counter = 0
+
+    while (newBagIterator.hasNext) {
+      counter += 1
+      newBagIterator.next()
+    }
+
+    counter shouldBe 0
+  }
+  // endregion
+
+  // region set state
+  property("test heap set state, generic methods") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val setValue1 = "setValue1"
+    implicit val setValue2 = "setValue2"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val setState = stateInternals.state[SetState[String]](namespace,
+      StateTags.set(stateId, StringUtf8Coder.of))
+
+    setState.add(setValue1)
+    setState.add(setValue2)
+
+    implicit var setStateIterator = setState.read.iterator()
+
+    implicit var counter = 0
+    while (setStateIterator.hasNext) {
+      counter += 1
+      setStateIterator.next()
+    }
+
+    counter shouldBe 2
+
+    setState.addIfAbsent(setValue2).read shouldBe false
+
+    setStateIterator = setState.read.iterator()
+
+    counter = 0
+    while (setStateIterator.hasNext) {
+      counter += 1
+      setStateIterator.next()
+    }
+
+    counter shouldBe 2
+
+    setState.contains(setValue1).read shouldBe true
+    setState.contains("setValue03").read shouldBe false
+
+    setState.isEmpty.read shouldBe false
+
+    setState.remove(setValue1)
+    setState.remove(setValue2)
+
+    setState.isEmpty.read shouldBe true
+  }
+
+  property("test heap set state, write state should not equal read state " +
+    "with different key") {
+    implicit val key = "key"
+    implicit val newKey = "newKey"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val setValue1 = "setValue1"
+    implicit val setValue2 = "setValue2"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val newStateInternals = factory.stateInternalsForKey(newKey)
+
+    val setState = stateInternals.state[SetState[String]](namespace,
+      StateTags.set(stateId, StringUtf8Coder.of))
+    val newSetState = newStateInternals.state(namespace,
+      StateTags.set(stateId, StringUtf8Coder.of))
+
+    setState.add(setValue1)
+    setState.add(setValue2)
+
+    implicit val newSetStateIterator = newSetState.read.iterator()
+
+    var counter = 0
+    while (newSetStateIterator.hasNext) {
+      counter += 1
+      newSetStateIterator.next()
+    }
+
+    counter shouldBe 0
+  }
+
+  property("test heap set state, write state shuold not equal read state " +
+    "with different state id") {
+    implicit val key = "key"
+    implicit val newKey = "newKey"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val newStateId = "02"
+    implicit val setValue1 = "setValue1"
+    implicit val setValue2 = "setValue2"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+
+    val setState = stateInternals.state(namespace, StateTags.set(stateId, 
StringUtf8Coder.of))
+    val newSetState = stateInternals.state(namespace,
+      StateTags.set(newStateId, StringUtf8Coder.of))
+
+    setState.add(setValue1)
+    setState.addIfAbsent(setValue2)
+
+    implicit val setStateIterator = newSetState.read.iterator()
+
+    var counter = 0
+    while (setStateIterator.hasNext) {
+      counter += 1
+      setStateIterator.next()
+    }
+
+    counter shouldBe 0
+  }
+  // endregion
+
+  // region map state
+  property("test map state, generic methods") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val mapStateKey1 = "mapKey01"
+    implicit val mapStateValue1 = "mapValue01"
+    implicit val mapStateKey2 = "mapKey02"
+    implicit val mapStateValue2 = "mapValue02"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val mapState = stateInternals.state(namespace,
+      StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+    mapState.put(mapStateKey1, mapStateValue1)
+
+    implicit var mapKeysIterator = mapState.keys.read.iterator()
+
+    mapState.putIfAbsent(mapStateKey1, mapStateValue2).read shouldBe 
mapStateValue1
+
+    var counter = 0
+    while (mapKeysIterator.hasNext) {
+      counter += 1
+      mapKeysIterator.next()
+    }
+
+    counter shouldBe 1
+
+    counter = 0
+    implicit val mapValuesIterator = mapState.values.read.iterator()
+    while (mapValuesIterator.hasNext) {
+      counter += 1
+      mapValuesIterator.next()
+    }
+
+    counter shouldBe 1
+
+    counter = 0
+    implicit val mapEntriesIterator = mapState.entries.read.iterator()
+    while (mapEntriesIterator.hasNext) {
+      counter += 1
+      mapEntriesIterator.next()
+    }
+
+    counter shouldBe 1
+
+    mapState.get(mapStateKey1).read shouldBe mapStateValue1
+    mapState.get("test01").read shouldBe null
+
+    mapState.remove(mapStateKey1)
+
+    counter = 0
+    mapKeysIterator = mapState.keys.read.iterator()
+    while (mapKeysIterator.hasNext) {
+      counter += 1
+      mapKeysIterator.next()
+    }
+
+    counter shouldBe 0
+
+    mapState.putIfAbsent(mapStateKey2, mapStateValue2)
+
+    counter = 0
+    mapKeysIterator = mapState.keys.read.iterator()
+    while (mapKeysIterator.hasNext) {
+      counter += 1
+      mapKeysIterator.next()
+    }
+
+    counter shouldBe 1
+
+    mapState.clear
+
+    counter = 0
+    mapKeysIterator = mapState.keys.read.iterator()
+    while (mapKeysIterator.hasNext) {
+      counter += 1
+      mapKeysIterator.next()
+    }
+
+    counter shouldBe 0
+  }
+
+  property("test map state, write state should not equal read state " +
+    "with different key") {
+    implicit val key = "key"
+    implicit val newKey = "newKey"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val mapStateKey1 = "mapKey01"
+    implicit val mapStateValue1 = "mapValue01"
+    implicit val mapStateKey2 = "mapKey02"
+    implicit val mapStateValue2 = "mapValue02"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+    val newStateInternals = factory.stateInternalsForKey(newKey)
+
+    val mapState = stateInternals.state(namespace,
+      StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+    val newMapState = newStateInternals.state(namespace,
+      StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+    mapState.put(mapStateKey1, mapStateValue1)
+
+    mapState.get(mapStateKey1).read shouldBe mapStateValue1
+    newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+  }
+
+  property("test map state, write state should not equal read state " +
+    "with different state id") {
+    implicit val key = "key"
+    implicit val namespace = StateNamespaces.global
+    implicit val stateId = "01"
+    implicit val newStateId = "02"
+    implicit val mapStateKey1 = "mapKey01"
+    implicit val mapStateValue1 = "mapValue01"
+    implicit val mapStateKey2 = "mapKey02"
+    implicit val mapStateValue2 = "mapValue02"
+
+    val map: Map[String, Table[String, String, Array[Byte]]]
+    = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+    val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, 
map)
+
+    val stateInternals = factory.stateInternalsForKey(key)
+
+    val mapState = stateInternals.state(namespace,
+      StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+    val newMapState = stateInternals.state(namespace,
+      StateTags.map(newStateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+    mapState.put(mapStateKey1, mapStateValue1)
+
+    mapState.get(mapStateKey1).read shouldBe mapStateValue1
+    newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+  }
+  // endregion
+
+}

Reply via email to