http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
new file mode 100644
index 0000000..3e299a6
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class StructuredCoder<T> extends Coder<T> {
+    protected StructuredCoder() {}
+
+    public List<? extends Coder<?>> getComponents() {
+        List<? extends Coder<?>> coderArguments = getCoderArguments();
+        if (coderArguments == null) {
+            return Collections.emptyList();
+        } else {
+            return coderArguments;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || this.getClass() != o.getClass()) {
+            return false;
+        }
+        StructuredCoder<?> that = (StructuredCoder<?>) o;
+        return this.getComponents().equals(that.getComponents());
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode() * 31 + getComponents().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        String s = getClass().getName();
+        builder.append(s.substring(s.lastIndexOf('.') + 1));
+
+        List<? extends Coder<?>> componentCoders = getComponents();
+        if (!componentCoders.isEmpty()) {
+            builder.append('(');
+            boolean first = true;
+            for (Coder<?> componentCoder : componentCoders) {
+                if (first) {
+                    first = false;
+                } else {
+                    builder.append(',');
+                }
+                builder.append(componentCoder.toString());
+            }
+            builder.append(')');
+        }
+        return builder.toString();
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return false;
+    }
+
+    @Override
+    public Object structuralValue(T value) {
+        if (value != null && consistentWithEquals()) {
+            return value;
+        } else {
+            try {
+                ByteArrayOutputStream os = new ByteArrayOutputStream();
+                encode(value, os);
+                return new StructuralByteArray(os.toByteArray());
+            } catch (Exception exn) {
+                throw new IllegalArgumentException(
+                        "Unable to encode element '" + value + "' with coder 
'" + this + "'.", exn);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
new file mode 100644
index 0000000..bebc1e4
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VarInt {
+
+    private static long convertIntToLongNoSignExtend(int v) {
+        return v & 0xFFFFFFFFL;
+    }
+
+    public static void encode(int v, OutputStream stream) throws IOException {
+        encode(convertIntToLongNoSignExtend(v), stream);
+    }
+
+    public static void encode(long v, OutputStream stream) throws IOException {
+        do {
+            // Encode next 7 bits + terminator bit
+            long bits = v & 0x7F;
+            v >>>= 7;
+            byte b = (byte) (bits | ((v != 0) ? 0x80 : 0));
+            stream.write(b);
+        } while (v != 0);
+    }
+
+    public static int decodeInt(InputStream stream) throws IOException {
+        long r = decodeLong(stream);
+        if (r < 0 || r >= 1L << 32) {
+            throw new IOException("varint overflow " + r);
+        }
+        return (int) r;
+    }
+
+    public static long decodeLong(InputStream stream) throws IOException {
+        long result = 0;
+        int shift = 0;
+        int b;
+        do {
+            // Get 7 bits from next byte
+            b = stream.read();
+            if (b < 0) {
+                if (shift == 0) {
+                    throw new EOFException();
+                } else {
+                    throw new IOException("varint not terminated");
+                }
+            }
+            long bits = b & 0x7F;
+            if (shift >= 64 || (shift == 63 && bits > 1)) {
+                // Out of range
+                throw new IOException("varint too long");
+            }
+            result |= bits << shift;
+            shift += 7;
+        } while ((b & 0x80) != 0);
+        return result;
+    }
+
+    public static int getLength(int v) {
+        return getLength(convertIntToLongNoSignExtend(v));
+    }
+
+    public static int getLength(long v) {
+        int result = 0;
+        do {
+            result++;
+            v >>>= 7;
+        } while (v != 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
new file mode 100644
index 0000000..7dac822
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class VarIntCoder extends AtomicCoder<Integer> {
+
+    public static VarIntCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final VarIntCoder INSTANCE = new VarIntCoder();
+
+    private VarIntCoder() {}
+
+    @Override
+    public void encode(Integer value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Integer");
+        }
+        try {
+            VarInt.encode(value.intValue(), outStream);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public Integer decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return VarInt.decodeInt(inStream);
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (Exception e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {}
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Integer value) {
+        return true;
+    }
+
+    @Override
+    public long getEncodedElementByteSize(Integer value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Integer");
+        }
+        return VarInt.getLength(value.longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
new file mode 100644
index 0000000..15af634
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.List;
+
+public class VarLongCoder extends StructuredCoder<Long> {
+    public static VarLongCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final VarLongCoder INSTANCE = new VarLongCoder();
+
+    private VarLongCoder() {}
+
+    @Override
+    public void encode(Long value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Long");
+        }
+        try {
+            VarInt.encode(value.longValue(), outStream);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public Long decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return VarInt.decodeLong(inStream);
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (Exception e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() {}
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Long value) {
+        return true;
+    }
+
+    @Override
+    public long getEncodedElementByteSize(Long value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Long");
+        }
+        return VarInt.getLength(value.longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
new file mode 100644
index 0000000..f4d00f1
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VoidCoder extends AtomicCoder<Void> {
+
+    public static VoidCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final VoidCoder INSTANCE = new VoidCoder();
+
+    private VoidCoder() {
+    }
+
+    @Override
+    public void encode(Void value, OutputStream outStream) {
+        // Nothing to write!
+    }
+
+    @Override
+    public Void decode(InputStream inStream) {
+        // Nothing to read!
+        return null;
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Void value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(Void value) {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
index 17c93bd..e706f4f 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -20,10 +20,16 @@ package 
org.apache.gearpump.streaming.refactor.dsl.window.impl
 import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.dsl.window.api.Trigger
 
+<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 trait ReduceFnRunner {
 
   def process(message: Message): Unit
 
   def onTrigger(trigger: Trigger): Unit
+=======
+trait State {
+
+  def clear: Unit
+>>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
index d0b84cb..6665766 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 package org.apache.gearpump.streaming.refactor.sink
 
 import akka.actor.ActorSystem
@@ -33,4 +34,20 @@ object DataSinkProcessor {
     Processor[DataSinkTask](parallelism, description = description,
       taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
   }
+=======
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateTag[StateT <: State] extends Serializable {
+
+  def appendTo(sb: Appendable)
+
+  def getId: String
+
+  def getSpec: StateSpec[StateT]
+
+  def bind(binder: StateBinder): StateT
+
+>>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
new file mode 100644
index 0000000..4dbb07f
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util
+import java.util.Map.Entry
+import java.util.{ArrayList, HashSet, List, Set}
+import java.lang.Iterable
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import 
org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder
+import org.apache.gearpump.streaming.refactor.state.api._
+
+class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals 
{
+
+  protected val inMemoryStateTable: InMemoryGlobalStateInternals.StateTable =
+    new InMemoryGlobalStateInternals.StateTable {
+      override def binderForNamespace(namespace: StateNamespace): StateBinder 
= {
+        new InMemoryStateBinder
+      }
+  }
+
+  override def getKey: Any = key
+
+  override def state[T <: State](namespace: StateNamespace, address: 
StateTag[T]): T =
+    inMemoryStateTable.get(namespace, address)
+
+}
+
+object InMemoryGlobalStateInternals {
+
+  abstract class StateTable {
+
+    val stateTable: Table[StateNamespace, StateTag[_], State] = 
HashBasedTable.create()
+
+    def get[StateT <: State](namespace: StateNamespace, tag: 
StateTag[StateT]): StateT = {
+      val storage: State = stateTable.get(namespace, tag)
+      if (storage != null) {
+        storage.asInstanceOf[StateT]
+      }
+
+      val typedStorage: StateT = tag.getSpec.bind(tag.getId, 
binderForNamespace(namespace))
+      stateTable.put(namespace, tag, typedStorage)
+      typedStorage
+    }
+
+    def clearNamespace(namespace: StateNamespace): Unit = 
stateTable.rowKeySet().remove(namespace)
+
+    def clear: Unit = stateTable.clear()
+
+    def values: Iterable[State] = 
stateTable.values().asInstanceOf[Iterable[State]]
+
+    def isNamespaceInUse(namespace: StateNamespace): Boolean = 
stateTable.containsRow(namespace)
+
+    def getTagsInUse(namespace: StateNamespace): java.util.Map[StateTag[_], 
State]
+      = stateTable.row(namespace)
+
+    def getNamespacesInUse(): java.util.Set[StateNamespace] = 
stateTable.rowKeySet()
+
+    def binderForNamespace(namespace: StateNamespace): StateBinder
+
+  }
+
+  class InMemoryStateBinder extends StateBinder {
+
+    override def bindValue[T](id: String, spec: StateSpec[ValueState[T]],
+        coder: Coder[T]): ValueState[T] = new InMemoryValueState[T]()
+
+    override def bindBag[T](id: String, spec: StateSpec[BagState[T]],
+        elemCoder: Coder[T]): BagState[T] = new InMemoryBagState[T]()
+
+    override def bindSet[T](id: String, spec: StateSpec[SetState[T]],
+        elemCoder: Coder[T]): SetState[T] = new InMemorySetState[T]()
+
+    override def bindMap[KeyT, ValueT](id: String, spec: 
StateSpec[MapState[KeyT, ValueT]],
+        mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): 
MapState[KeyT, ValueT] =
+      new InMemoryMapState[KeyT, ValueT]()
+  }
+
+  trait InMemoryState[T <: InMemoryState[T]] {
+
+    def isCleared: Boolean
+
+    def copy: T
+
+  }
+
+  class InMemoryBagState[T] extends BagState[T] with 
InMemoryState[InMemoryBagState[T]] {
+
+    private var contents: List[T] = new ArrayList[T]
+
+    override def readLater: BagState[T] = this
+
+    override def isCleared: Boolean = contents.isEmpty
+
+    override def copy: InMemoryBagState[T] = {
+      val that: InMemoryBagState[T] = new InMemoryBagState[T]
+      that.contents.addAll(this.contents)
+      that
+    }
+
+    override def add(value: T): Unit = contents.add(value)
+
+    override def isEmpty: ReadableState[Boolean] = {
+      new ReadableState[Boolean] {
+        override def readLater: ReadableState[Boolean] = {
+          this
+        }
+
+        override def read: Boolean = {
+          contents.isEmpty
+        }
+      }
+    }
+
+    override def clear: Unit = contents = new ArrayList[T]
+
+    override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
+
+  }
+
+  class InMemoryValueState[T] extends ValueState[T] with 
InMemoryState[InMemoryValueState[T]] {
+
+    private var cleared: Boolean = true
+    private var value: T = _
+
+    def write(input: T): Unit = {
+      cleared = false
+      this.value = input
+    }
+
+    def readLater: InMemoryValueState[T] = this
+
+    def isCleared: Boolean = cleared
+
+    def copy: InMemoryValueState[T] = {
+      val that: InMemoryValueState[T] = new InMemoryValueState[T]
+      if (!this.cleared) {
+        that.cleared = this.cleared
+        that.value = this.value
+      }
+
+      that
+    }
+
+    def clear: Unit = {
+      value = null.asInstanceOf[T]
+      cleared = true
+    }
+
+    def read: T = value
+
+  }
+
+  class InMemoryMapState[K, V] extends MapState[K, V] with 
InMemoryState[InMemoryMapState[K, V]] {
+
+    private var contents: util.Map[K, V] = new util.HashMap[K, V]
+
+    override def put(key: K, value: V): Unit = contents.put(key, value)
+
+    override def putIfAbsent(key: K, value: V): ReadableState[V] = {
+      var v: V = contents.get(key)
+      if (v == null) {
+        v = contents.put(key, value)
+      }
+
+      ReadableStates.immediate(v)
+    }
+
+    override def remove(key: K): Unit = contents.remove(key)
+
+    override def get(key: K): ReadableState[V] = 
ReadableStates.immediate(contents.get(key))
+
+    override def keys: ReadableState[Iterable[K]] =
+      ReadableStates.immediate(contents.keySet().asInstanceOf[Iterable[K]])
+
+    override def values: ReadableState[Iterable[V]] =
+      ReadableStates.immediate(contents.values().asInstanceOf[Iterable[V]])
+
+    override def entries: ReadableState[Iterable[Entry[K, V]]] =
+      
ReadableStates.immediate(contents.entrySet().asInstanceOf[Iterable[util.Map.Entry[K,
 V]]])
+
+    override def isCleared: Boolean = contents.isEmpty
+
+    override def copy: InMemoryMapState[K, V] = {
+      val that: InMemoryMapState[K, V] = new InMemoryMapState
+      that.contents.putAll(this.contents)
+      that
+    }
+
+    override def clear: Unit = contents = new util.HashMap[K, V]
+
+  }
+
+  class InMemorySetState[T] extends SetState[T] with 
InMemoryState[InMemorySetState[T]] {
+
+    private var contents: Set[T] = new HashSet[T]
+
+    override def contains(t: T): ReadableState[Boolean] =
+      ReadableStates.immediate(contents.contains(t))
+
+    override def addIfAbsent(t: T): ReadableState[Boolean] = {
+      val alreadyContained: Boolean = contents.contains(t)
+      contents.add(t)
+      ReadableStates.immediate(!alreadyContained)
+    }
+
+    override def remove(t: T): Unit = contents.remove(t)
+
+    override def readLater: SetState[T] = this
+
+    override def isCleared: Boolean = contents.isEmpty
+
+    override def copy: InMemorySetState[T] = {
+      val that: InMemorySetState[T] = new InMemorySetState[T]
+      that.contents.addAll(this.contents)
+      that
+    }
+
+    override def add(value: T): Unit = contents.add(value)
+
+    override def isEmpty: ReadableState[Boolean] = {
+      new ReadableState[Boolean] {
+
+        override def readLater: ReadableState[Boolean] = this
+
+        override def read: Boolean = contents.isEmpty
+      }
+    }
+
+    override def clear: Unit = contents = new HashSet[T]
+
+    override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
+  }
+
+}
+
+object ReadableStates {
+
+  def immediate[T](value: T): ReadableState[T] = {
+    new ReadableState[T] {
+      override def readLater: ReadableState[T] = {
+        this
+      }
+
+      override def read: T = {
+        value
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
index c387960..8832aee 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -23,10 +23,16 @@ import java.time.Instant
 import org.apache.gearpump.streaming.refactor.coder.Coder
 import org.apache.gearpump.streaming.refactor.state.api.StateInternals
 
+<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
 /**
  *
  */
 trait RuntimeContext {
+=======
+trait StateSpec[StateT <: State] extends Serializable {
+
+  def bind(id: String, binder: StateBinder): StateT
+>>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
 
   def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
new file mode 100644
index 0000000..db39142
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, 
SetState, ValueState}
+
+trait StateBinder {
+
+  def bindValue[T](id: String, spec: StateSpec[ValueState[T]], coder: 
Coder[T]): ValueState[T]
+
+  def bindBag[T](id: String, spec: StateSpec[BagState[T]], elemCoder: 
Coder[T]): BagState[T]
+
+  def bindSet[T](id: String, spec: StateSpec[SetState[T]], elemCoder: 
Coder[T]): SetState[T]
+
+  def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, 
ValueT]],
+      mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, 
ValueT]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
new file mode 100644
index 0000000..dbc2320
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+trait StateNamespace {
+
+  def stringKey: String
+
+  def appendTo(sb: Appendable): Unit
+
+  def getCacheKey: Object
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
new file mode 100644
index 0000000..c2cba51
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+object StateNamespaces {
+
+  def global: StateNamespace = {
+    new GlobalNameSpace
+  }
+
+  private object NameSpace extends Enumeration {
+    type NameSpace = Value
+    val GLOBAL, WINDOW, WINDOW_AND_TRIGGER = Value
+  }
+
+  class GlobalNameSpace extends StateNamespace {
+
+    private val GLOBAL_STRING: String = "/"
+
+    override def stringKey: String = {
+      GLOBAL_STRING
+    }
+
+    override def appendTo(sb: Appendable): Unit = {
+      sb.append(GLOBAL_STRING)
+    }
+
+    override def getCacheKey: AnyRef = {
+      GLOBAL_STRING
+    }
+
+    override def equals(obj: Any): Boolean = {
+      obj == this || obj.isInstanceOf[GlobalNameSpace]
+    }
+
+    override def hashCode(): Int = {
+      Objects.hash(NameSpace.GLOBAL)
+    }
+  }
+
+  // TODO : implement WindowNamespace & WindowAndTriggerNamespace
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
new file mode 100644
index 0000000..f056915
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, 
SetState, ValueState}
+
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+    var aCoder: Coder[T] = coder
+
+    override def bind(id: String, binder: StateBinder): ValueState[T] = {
+      binder.bindValue(id, this, aCoder)
+    }
+
+    override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+      if (this.aCoder == null) {
+        if (coders(0) != null) {
+          this.aCoder = coders(0).asInstanceOf[Coder[T]]
+        }
+      }
+    }
+
+    override def finishSpecifying: Unit = {
+      if (aCoder == null) throw new IllegalStateException(
+        "Unable to infer a coder for ValueState and no Coder"
+        + " was specified. Please set a coder by either invoking"
+        + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder 
in the"
+        + " Pipeline's CoderRegistry.")
+    }
+
+    override def equals(obj: Any): Boolean = {
+      var result = false
+      if (obj == this) result = true
+
+      if (!(obj.isInstanceOf[ValueStateSpec[T]])) result = false
+
+      val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+      result = Objects.equals(this.aCoder, that.aCoder)
+      result
+    }
+
+    override def hashCode(): Int = {
+      Objects.hashCode(this.aCoder)
+    }
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+    private implicit var elemCoder = coder
+
+    override def bind(id: String, binder: StateBinder): BagState[T] =
+      binder.bindBag(id, this, elemCoder)
+
+    override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+      if (this.elemCoder == null) {
+        if (coders(0) != null) {
+          this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+        }
+      }
+    }
+
+    override def finishSpecifying: Unit = {
+      if (elemCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for BagState 
and no Coder"
+          + " was specified. Please set a coder by either invoking"
+          + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder 
in the"
+          + " Pipeline's CoderRegistry.");
+      }
+    }
+
+    override def equals(obj: Any): Boolean = {
+      var result = false
+      if (obj == this) result = true
+
+      if (!obj.isInstanceOf[BagStateSpec[_]]) result = false
+
+      val that = obj.asInstanceOf[BagStateSpec[_]]
+      result = Objects.equals(this.elemCoder, that.elemCoder)
+      result
+    }
+
+    override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V])
+    extends StateSpec[MapState[K, V]] {
+
+    private implicit var kCoder = keyCoder
+    private implicit var vCoder = valueCoder
+
+    override def bind(id: String, binder: StateBinder): MapState[K, V] =
+      binder.bindMap(id, this, keyCoder, valueCoder)
+
+    override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = {
+      if (this.kCoder == null) {
+        if (coders(0) != null) {
+          this.kCoder = coders(0).asInstanceOf[Coder[K]]
+        }
+      }
+
+      if (this.vCoder == null) {
+        if (coders(1) != null) {
+          this.vCoder = coders(1).asInstanceOf[Coder[V]]
+        }
+      }
+    }
+
+    override def finishSpecifying: Unit = {
+      if (keyCoder == null || valueCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for MapState 
and no Coder"
+          + " was specified. Please set a coder by either invoking"
+          + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by 
registering the"
+          + " coder in the Pipeline's CoderRegistry.");
+      }
+    }
+
+    override def hashCode(): Int = Objects.hash(getClass, kCoder, vCoder)
+
+    override def equals(obj: Any): Boolean = {
+      var result = false
+      if (obj == this) result = true
+
+      if (!obj.isInstanceOf[MapStateSpec[_, _]]) result = false
+
+      implicit var that = obj.asInstanceOf[MapStateSpec[_, _]]
+      result = Objects.equals(this.kCoder, that.vCoder) && 
Objects.equals(this.vCoder, that.vCoder)
+      result
+    }
+  }
+
+  private class SetStateSpec[T](coder: Coder[T]) extends 
StateSpec[SetState[T]] {
+
+    private implicit var elemCoder = coder
+
+    override def bind(id: String, binder: StateBinder): SetState[T] =
+      binder.bindSet(id, this, elemCoder)
+
+    override def offerCoders(coders: Array[Coder[SetState[T]]]): Unit = {
+      if (this.elemCoder == null) {
+        if (coders(0) != null) {
+          this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+        }
+      }
+    }
+
+    override def finishSpecifying: Unit = {
+      if (elemCoder == null) {
+        throw new IllegalStateException("Unable to infer a coder for SetState 
and no Coder"
+          + " was specified. Please set a coder by either invoking"
+          + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder 
in the"
+          + " Pipeline's CoderRegistry.");
+      }
+    }
+
+    override def equals(obj: Any): Boolean = {
+      var result = false
+      if (obj == this) result = true
+
+      if (!obj.isInstanceOf[SetStateSpec[_]]) result = false
+
+      implicit var that = obj.asInstanceOf[SetStateSpec[_]]
+      result = Objects.equals(this.elemCoder, that.elemCoder)
+      result
+    }
+
+    override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  def value[T]: StateSpec[ValueState[T]] = new ValueStateSpec[T](null)
+
+  def value[T](valueCoder: Coder[T]): StateSpec[ValueState[T]] = {
+    if (valueCoder == null) {
+      throw new NullPointerException("valueCoder should not be null. Consider 
value() instead")
+    }
+
+    new ValueStateSpec[T](valueCoder)
+  }
+
+  def bag[T]: StateSpec[BagState[T]] = new BagStateSpec[T](null)
+
+  def bag[T](elemCoder: Coder[T]): StateSpec[BagState[T]] = new 
BagStateSpec[T](elemCoder)
+
+  def set[T]: StateSpec[SetState[T]] = new SetStateSpec[T](null)
+
+  def set[T](elemCoder: Coder[T]): StateSpec[SetState[T]] = new 
SetStateSpec[T](elemCoder)
+
+  def map[K, V]: StateSpec[MapState[K, V]] = new MapStateSpec[K, V](null, null)
+
+  def map[K, V](keyCoder: Coder[K], valueCoder: Coder[V]): 
StateSpec[MapState[K, V]] =
+    new MapStateSpec[K, V](keyCoder, valueCoder)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
new file mode 100644
index 0000000..cbd050a
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import 
org.apache.gearpump.streaming.refactor.state.StateTags.StateKind.StateKind
+import org.apache.gearpump.streaming.refactor.state.api._
+
+object StateTags {
+
+  object StateKind extends Enumeration {
+    type StateKind = Value
+    val SYSTEM = Value("s")
+    val USER = Value("u")
+  }
+
+  private trait SystemStateTag[StateT <: State] {
+    def asKind(kind: StateKind): StateTag[StateT]
+  }
+
+  def tagForSpec[StateT <: State](id: String, spec: StateSpec[StateT]): 
StateTag[StateT] =
+    new SimpleStateTag[StateT](new StructureId(id), spec)
+
+  def value[T](id: String, valueCoder: Coder[T]): StateTag[ValueState[T]] =
+    new SimpleStateTag[ValueState[T]](new StructureId(id), 
StateSpecs.value(valueCoder))
+
+  def bag[T](id: String, elemCoder: Coder[T]): StateTag[BagState[T]] =
+    new SimpleStateTag[BagState[T]](new StructureId(id), 
StateSpecs.bag(elemCoder))
+
+  def set[T](id: String, elemCoder: Coder[T]): StateTag[SetState[T]] =
+    new SimpleStateTag[SetState[T]](new StructureId(id), 
StateSpecs.set(elemCoder))
+
+  def map[K, V](id: String, keyCoder: Coder[K], valueCoder: Coder[V]): 
StateTag[MapState[K, V]] =
+    new SimpleStateTag[MapState[K, V]](new StructureId(id), 
StateSpecs.map(keyCoder, valueCoder))
+
+  private class SimpleStateTag[StateT <: State](id: StructureId, spec: 
StateSpec[StateT])
+    extends StateTag[StateT] with SystemStateTag[StateT] {
+
+    val aSpec: StateSpec[StateT] = spec
+    val aId: StructureId = id
+
+    override def appendTo(sb: Appendable): Unit = aId.appendTo(sb)
+
+
+    override def getId: String = id.getRawId
+
+    override def getSpec: StateSpec[StateT] = aSpec
+
+    override def bind(binder: StateBinder): StateT = aSpec.bind(aId.getRawId, 
binder)
+
+    override def asKind(kind: StateKind): StateTag[StateT] =
+      new SimpleStateTag[StateT](aId.asKind(kind), aSpec)
+
+    override def hashCode(): Int = Objects.hash(getClass, getId, getSpec)
+
+    override def equals(obj: Any): Boolean = {
+      if (!(obj.isInstanceOf[SimpleStateTag[_]])) false
+
+      val otherTag: SimpleStateTag[_] = obj.asInstanceOf[SimpleStateTag[_]]
+      Objects.equals(getId, otherTag.getId) && Objects.equals(getSpec, 
otherTag.getSpec)
+    }
+  }
+
+  private class StructureId(kind: StateKind, rawId: String) extends 
Serializable {
+
+    private val k: StateKind = kind
+    private val r: String = rawId
+
+    def this(rawId: String) {
+      this(StateKind.USER, rawId)
+    }
+
+    def asKind(kind: StateKind): StructureId = new StructureId(kind, r)
+
+    def appendTo(sb: Appendable): Unit = sb.append(k.toString).append(r)
+
+    def getRawId: String = r
+
+    override def hashCode(): Int = Objects.hash(k, r)
+
+    override def equals(obj: Any): Boolean = {
+      if (obj == this) true
+
+      if (!(obj.isInstanceOf[StructureId])) false
+
+      val that : StructureId = obj.asInstanceOf[StructureId]
+      Objects.equals(k, that.k) && Objects.equals(r, that.r)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
index 6d72e78..0f94052 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
@@ -34,6 +34,13 @@ import 
org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
 
+<<<<<<< HEAD
+=======
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management
 abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
 
@@ -53,7 +60,11 @@ abstract class StatefulTask(taskContext: TaskContext, conf: 
UserConfig)
   // core state data
   var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = 
null
 
+<<<<<<< HEAD
   def open(runtimeContext: RuntimeContext): Unit = {}
+=======
+  def open: Unit = {}
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management
 
   def invoke(message: Message): Unit
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
new file mode 100644
index 0000000..38d918e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait BagState[T] extends GroupingState[T, Iterable[T]] {
+
+  def readLater: BagState[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
new file mode 100644
index 0000000..640cc9e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait CombiningState[InputT, AccumT, OutputT] extends GroupingState[InputT, 
OutputT] {
+
+  def getAccum: AccumT
+
+  def addAccum(accumT: AccumT)
+
+  def mergeAccumulators(accumulators: Iterable[AccumT]): AccumT
+
+  def readLater: CombiningState[InputT, AccumT, OutputT]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
new file mode 100644
index 0000000..2f8939a
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait GroupingState[InputT, OutputT] extends ReadableState[OutputT] with State 
{
+
+  def add(value: InputT): Unit
+
+  def isEmpty: ReadableState[Boolean]
+
+  def readLater: GroupingState[InputT, OutputT]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
new file mode 100644
index 0000000..25de704
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait MapState[K, V] extends State {
+
+  def put(key : K, value : V): Unit
+
+  def putIfAbsent(key : K, value : V): ReadableState[V]
+
+  def remove(key : K): Unit
+
+  def get(key : K): ReadableState[V]
+
+  def keys: ReadableState[Iterable[K]]
+
+  def values: ReadableState[Iterable[V]]
+
+  def entries: ReadableState[Iterable[java.util.Map.Entry[K, V]]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
new file mode 100644
index 0000000..f6f4d98
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait ReadableState[T] {
+
+  def read: T
+
+  def readLater: ReadableState[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
new file mode 100644
index 0000000..e1990b2
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait SetState[T] extends GroupingState[T, Iterable[T]]{
+
+  def contains(t: T): ReadableState[Boolean]
+
+  def addIfAbsent(t: T): ReadableState[Boolean]
+
+  def remove(t: T): Unit
+
+  def readLater: SetState[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
new file mode 100644
index 0000000..e3a136d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag}
+
+trait StateInternals {
+
+  def getKey: Any
+
+  def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
new file mode 100644
index 0000000..215528c
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait StateInternalsFactory[K] extends Serializable {
+
+  def stateInternalsForKey(key: K): StateInternals
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
new file mode 100644
index 0000000..3555ec4
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait ValueState[T] extends ReadableState[T] with State {
+
+  def write(input : T): Unit
+
+  def readLater: ValueState[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
new file mode 100644
index 0000000..12b6e42
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream}
+import java.lang.Iterable
+import java.util
+import java.util.Map.Entry
+import java.util._
+import java.util.Objects
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, 
MapCoder, SetCoder}
+import org.apache.gearpump.streaming.refactor.state.{StateBinder, 
StateNamespace, StateSpec, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api._
+import org.apache.gearpump.util.LogUtil
+
+class HeapStateInternals[K](key: K, stateTable: Table[String, String, 
Array[Byte]])
+  extends StateInternals {
+
+  val LOG = LogUtil.getLogger(getClass)
+
+  private class HeapStateBinder(namespace: StateNamespace, address: 
StateTag[_])
+    extends StateBinder {
+
+    private val ns: StateNamespace = namespace
+    private val addr: StateTag[_] = address
+
+    override def bindValue[T](id: String, spec: StateSpec[ValueState[T]],
+        coder: Coder[T]): ValueState[T] =
+      new HeapValueState[T](ns, addr.asInstanceOf[StateTag[ValueState[T]]], 
coder)
+
+    override def bindBag[T](id: String, spec: StateSpec[BagState[T]],
+        elemCoder: Coder[T]): BagState[T] =
+      new HeapBagState[T](ns, addr.asInstanceOf[StateTag[BagState[T]]], 
elemCoder)
+
+    override def bindSet[T](id: String, spec: StateSpec[SetState[T]],
+        elemCoder: Coder[T]): SetState[T] =
+      new HeapSetState[T](ns, addr.asInstanceOf[StateTag[SetState[T]]], 
elemCoder)
+
+    override def bindMap[KeyT, ValueT](id: String, spec: 
StateSpec[MapState[KeyT, ValueT]],
+        mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): 
MapState[KeyT, ValueT] =
+      new HeapMapState[KeyT, ValueT](ns,
+        addr.asInstanceOf[StateTag[MapState[KeyT, ValueT]]], mapKeyCoder, 
mapValueCoder)
+
+  }
+
+  override def getKey: Any = key
+
+  override def state[T <: State](namespace: StateNamespace, address: 
StateTag[T]): T =
+    address.bind(new HeapStateBinder(namespace, address))
+
+  private class AbstractState[T](namespace: StateNamespace, address: 
StateTag[_ <: State],
+      coder: Coder[T]) {
+
+    protected val ns: StateNamespace = namespace
+    protected val addr: StateTag[_ <: State] = address
+    protected val c: Coder[T] = coder
+
+    protected def readValue: T = {
+      var value: T = null.asInstanceOf[T]
+      val buf: Array[Byte] = stateTable.get(ns.stringKey, addr.getId)
+      if (buf != null) {
+        val is: InputStream = new ByteArrayInputStream(buf)
+        try {
+          value = c.decode(is)
+        } catch {
+          case ex: Exception => throw new RuntimeException(ex)
+        }
+      }
+
+      value
+    }
+
+    def writeValue(input: T): Unit = {
+      val output: ByteArrayOutputStream = new ByteArrayOutputStream();
+      try {
+        c.encode(input, output)
+        stateTable.put(ns.stringKey, addr.getId, output.toByteArray)
+      } catch {
+        case ex: Exception => throw new RuntimeException(ex)
+      }
+    }
+
+    def clear: Unit = stateTable.remove(ns.stringKey, addr.getId)
+
+    override def hashCode(): Int = Objects.hash(ns, addr)
+
+    override def equals(obj: Any): Boolean = {
+      if (obj == this) true
+
+      if (null == obj || getClass != obj.getClass) false
+
+      val that: AbstractState[_] = obj.asInstanceOf[AbstractState[_]]
+      Objects.equals(ns, that.ns) && Objects.equals(addr, that.addr)
+    }
+  }
+
+  private class HeapValueState[T](namespace: StateNamespace,
+      address: StateTag[ValueState[T]], coder: Coder[T])
+      extends AbstractState[T](namespace, address, coder) with ValueState[T] {
+
+    override def write(input: T): Unit = writeValue(input)
+
+    override def readLater: ValueState[T] = this
+
+    override def read: T = readValue
+  }
+
+  private class HeapMapState[MapKT, MapVT](namespace: StateNamespace,
+      address: StateTag[MapState[MapKT, MapVT]], mapKCoder: Coder[MapKT], 
mapVCoder: Coder[MapVT])
+      extends AbstractState[Map[MapKT, MapVT]](
+        namespace, address, MapCoder.of(mapKCoder, mapVCoder))
+      with MapState[MapKT, MapVT] {
+
+    private def readMap: Map[MapKT, MapVT] = {
+      implicit var map = super.readValue
+      if (map == null || map.size() == 0) {
+        map = new util.HashMap[MapKT, MapVT]
+      }
+
+      map
+    }
+
+    override def put(key: MapKT, value: MapVT): Unit = {
+      implicit var map = readMap
+      map.put(key, value)
+      super.writeValue(map)
+    }
+
+    override def putIfAbsent(key: MapKT, value: MapVT): ReadableState[MapVT] = 
{
+      implicit var map = readMap
+      implicit val previousVal = map.putIfAbsent(key, value)
+      super.writeValue(map)
+      new ReadableState[MapVT] {
+
+        override def readLater: ReadableState[MapVT] = this
+
+        override def read: MapVT = previousVal
+      }
+    }
+
+    override def remove(key: MapKT): Unit = {
+      implicit var map = readMap
+      map.remove(key)
+      super.writeValue(map)
+    }
+
+    override def get(key: MapKT): ReadableState[MapVT] = {
+      implicit var map = readMap
+      new ReadableState[MapVT] {
+
+        override def read: MapVT = map.get(key)
+
+        override def readLater: ReadableState[MapVT] = this
+      }
+    }
+
+    override def keys: ReadableState[Iterable[MapKT]] = {
+      implicit val map = readMap
+      new ReadableState[Iterable[MapKT]] {
+
+        override def readLater: ReadableState[Iterable[MapKT]] = this
+
+        override def read: Iterable[MapKT] = map.keySet()
+      }
+    }
+
+    override def values: ReadableState[Iterable[MapVT]] = {
+      implicit val map = readMap
+      new ReadableState[Iterable[MapVT]] {
+
+        override def readLater: ReadableState[Iterable[MapVT]] = this
+
+        override def read: Iterable[MapVT] = map.values()
+      }
+    }
+
+    override def entries: ReadableState[Iterable[Entry[MapKT, MapVT]]] = {
+      implicit var map = readMap
+      new ReadableState[Iterable[Entry[MapKT, MapVT]]] {
+
+        override def readLater: ReadableState[Iterable[Entry[MapKT, MapVT]]] = 
this
+
+        override def read: Iterable[Entry[MapKT, MapVT]] = map.entrySet()
+      }
+    }
+
+    override def clear: Unit = {
+      implicit var map = readMap
+      map.clear()
+      super.writeValue(map)
+    }
+}
+
+  private class HeapBagState[T](namespace: StateNamespace,
+      address: StateTag[BagState[T]], coder: Coder[T])
+      extends AbstractState[List[T]](namespace, address, ListCoder.of(coder)) 
with BagState[T] {
+
+    override def readLater: BagState[T] = this
+
+    override def add(input: T): Unit = {
+      val value: List[T] = read
+      value.add(input)
+      writeValue(value)
+    }
+
+    override def isEmpty: ReadableState[Boolean] = {
+      new ReadableState[Boolean] {
+
+        override def readLater: ReadableState[Boolean] = this
+
+        override def read: Boolean = stateTable.get(ns.stringKey, addr.getId) 
== null
+      }
+    }
+
+    override def read: List[T] = {
+      var value: List[T] = super.readValue
+      if (value == null || value.size() == 0) {
+        value = new ArrayList[T]
+      }
+
+      value
+    }
+  }
+
+  private class HeapSetState[T](namespace: StateNamespace,
+      address: StateTag[SetState[T]], coder: Coder[T])
+      extends AbstractState[Set[T]](namespace, address, SetCoder.of(coder)) 
with SetState[T] {
+
+    override def contains(t: T): ReadableState[Boolean] = {
+      implicit val set = read
+      new ReadableState[Boolean] {
+
+        override def readLater: ReadableState[Boolean] = this
+
+        override def read: Boolean = set.contains(t)
+      }
+    }
+
+    override def addIfAbsent(t: T): ReadableState[Boolean] = {
+      implicit val set = read
+      val success = set.add(t)
+      super.writeValue(set)
+      new ReadableState[Boolean] {
+
+        override def readLater: ReadableState[Boolean] = this
+
+        override def read: Boolean = success
+      }
+    }
+
+    override def remove(t: T): Unit = {
+      implicit var set = read
+      set.remove(t)
+      writeValue(set)
+    }
+
+    override def readLater: SetState[T] = this
+
+    override def add(value: T): Unit = {
+      implicit var set = read
+      set.add(value)
+      writeValue(set)
+    }
+
+    override def isEmpty: ReadableState[Boolean] = {
+      implicit val set = read
+      new ReadableState[Boolean] {
+
+        override def readLater: ReadableState[Boolean] = this
+
+        override def read: Boolean = set.isEmpty
+      }
+    }
+
+    override def read: Set[T] = {
+      var value: Set[T] = super.readValue
+      if (value == null || value.size() == 0) {
+        value = new util.HashSet[T]()
+      }
+
+      value
+    }
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
new file mode 100644
index 0000000..db20d66
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import org.apache.gearpump.streaming.refactor.coder.{Coder, CoderException, 
CoderUtils}
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
StateInternalsFactory}
+import java.util._
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.util.LogUtil
+
+class HeapStateInternalsFactory[K](keyCoder: Coder[K],
+    map: Map[String, Table[String, String, Array[Byte]]])
+    extends StateInternalsFactory[K] with Serializable {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  private val kc: Coder[K] = keyCoder
+  private val perKeyState: Map[String, Table[String, String, Array[Byte]]] = 
map
+
+  def getKeyCoder: Coder[K] = {
+    this.kc
+  }
+
+  override def stateInternalsForKey(key: K): StateInternals = {
+    var keyBytes: Option[Array[Byte]] = None
+      if (key != null) {
+        keyBytes = Some(CoderUtils.encodeToByteArray(kc, key))
+      }
+
+    if (keyBytes.isEmpty) {
+      throw new RuntimeException("key bytes is null or empty, encode key 
occurs a error")
+    }
+
+    val keyBased64Str = Base64.getEncoder.encodeToString(keyBytes.get)
+    var stateTable: Table[String, String, Array[Byte]] = 
perKeyState.get(keyBased64Str)
+    if (stateTable == null) {
+      LOG.info("stateTable is null, will create!")
+      stateTable = HashBasedTable.create()
+      perKeyState.put(keyBased64Str, stateTable)
+    }
+
+    new HeapStateInternals[K](key, stateTable)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
new file mode 100644
index 0000000..2f85dd9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api.{State, 
StateInternals, StateInternalsFactory}
+
+class HeapStateInternalsProxy[K](heapStateInternalsFactory: 
HeapStateInternalsFactory[K])
+  extends StateInternals with Serializable {
+
+  private val factory: HeapStateInternalsFactory[K] = heapStateInternalsFactory
+
+  @transient
+  private var currentKey: K = _
+
+  def getFactory: StateInternalsFactory[K] = {
+    factory
+  }
+
+  def getKeyCoder: Coder[K] = {
+    factory.getKeyCoder
+  }
+
+  override def getKey: K = {
+    currentKey
+  }
+
+  def setKey(key: K): Unit = {
+    currentKey = key
+  }
+
+  override def state[T <: State](namespace: StateNamespace, address: 
StateTag[T]): T = {
+    factory.stateInternalsForKey(currentKey).state(namespace, address)
+  }
+}

Reply via email to