[Gearpump 311] refactor state management

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/7068699d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/7068699d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/7068699d

Branch: refs/heads/state
Commit: 7068699d3e865f4d5bd49bcfb59f2cd4115ec125
Parents: b9f1086
Author: vinoyang <[email protected]>
Authored: Wed Jun 28 11:41:15 2017 +0800
Committer: vinoyang <[email protected]>
Committed: Sun Jul 23 09:59:05 2017 +0800

----------------------------------------------------------------------
 .../streaming/refactor/coder/AtomicCoder.java   |  48 ++
 .../refactor/coder/BigEndianIntegerCoder.java   |  83 ++++
 .../refactor/coder/BigEndianLongCoder.java      |  83 ++++
 .../refactor/coder/BigIntegerCoder.java         |  74 +++
 .../BufferedElementCountingOutputStream.java    | 125 +++++
 .../refactor/coder/ByteArrayCoder.java          |  94 ++++
 .../streaming/refactor/coder/ByteCoder.java     |  88 ++++
 .../streaming/refactor/coder/Coder.java         | 134 +++++
 .../refactor/coder/CoderException.java          |  32 ++
 .../streaming/refactor/coder/CoderUtils.java    | 286 +++++++++++
 .../streaming/refactor/coder/DoubleCoder.java   |  86 ++++
 .../ElementByteSizeObservableIterable.java      |  45 ++
 .../ElementByteSizeObservableIterator.java      |  30 ++
 .../refactor/coder/ElementByteSizeObserver.java |  66 +++
 .../streaming/refactor/coder/IterableCoder.java |  41 ++
 .../refactor/coder/IterableLikeCoder.java       | 238 +++++++++
 .../refactor/coder/IteratorObserver.java        |  56 +++
 .../streaming/refactor/coder/ListCoder.java     |  47 ++
 .../streaming/refactor/coder/MapCoder.java      | 138 ++++++
 .../streaming/refactor/coder/SetCoder.java      |  49 ++
 .../refactor/coder/StringUtf8Coder.java         |  99 ++++
 .../refactor/coder/StructuralByteArray.java     |  55 +++
 .../refactor/coder/StructuredCoder.java         |  95 ++++
 .../streaming/refactor/coder/VarInt.java        |  91 ++++
 .../streaming/refactor/coder/VarIntCoder.java   |  82 ++++
 .../streaming/refactor/coder/VarLongCoder.java  |  88 ++++
 .../streaming/refactor/coder/VoidCoder.java     |  66 +++
 .../dsl/window/impl/ReduceFnRunner.scala        |   6 +
 .../refactor/sink/DataSinkProcessor.scala       |  17 +
 .../state/InMemoryGlobalStateInternals.scala    | 269 +++++++++++
 .../refactor/state/RuntimeContext.scala         |   6 +
 .../streaming/refactor/state/StateBinder.scala  |  35 ++
 .../refactor/state/StateNamespace.scala         |  29 ++
 .../refactor/state/StateNamespaces.scala        |  61 +++
 .../streaming/refactor/state/StateSpecs.scala   | 213 ++++++++
 .../streaming/refactor/state/StateTags.scala    | 109 +++++
 .../streaming/refactor/state/StatefulTask.scala |  11 +
 .../streaming/refactor/state/api/BagState.scala |  27 ++
 .../refactor/state/api/CombiningState.scala     |  31 ++
 .../refactor/state/api/GroupingState.scala      |  29 ++
 .../streaming/refactor/state/api/MapState.scala |  39 ++
 .../refactor/state/api/ReadableState.scala      |  27 ++
 .../streaming/refactor/state/api/SetState.scala |  33 ++
 .../refactor/state/api/StateInternals.scala     |  29 ++
 .../state/api/StateInternalsFactory.scala       |  25 +
 .../refactor/state/api/ValueState.scala         |  27 ++
 .../state/heap/HeapStateInternals.scala         | 305 ++++++++++++
 .../state/heap/HeapStateInternalsFactory.scala  |  62 +++
 .../state/heap/HeapStateInternalsProxy.scala    |  52 ++
 .../state/heap/HeapStateInternalsSpec.scala     | 484 +++++++++++++++++++
 50 files changed, 4345 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
new file mode 100644
index 0000000..e152b48
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AtomicCoder<T> extends StructuredCoder<T>  {
+
+    @Override
+    public void verifyDeterministic() {}
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final List<? extends Coder<?>> getComponents() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final boolean equals(Object other) {
+        return other != null && this.getClass().equals(other.getClass());
+    }
+
+    @Override
+    public final int hashCode() {
+        return this.getClass().hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
new file mode 100644
index 0000000..27ec539
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
+
+    public static BigEndianIntegerCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final BigEndianIntegerCoder INSTANCE = new 
BigEndianIntegerCoder();
+
+    private BigEndianIntegerCoder() {}
+
+    @Override
+    public void encode(Integer value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Integer");
+        }
+
+        try {
+            new DataOutputStream(outStream).writeInt(value);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Integer decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return new DataInputStream(inStream).readInt();
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {}
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Integer value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(Integer value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Integer");
+        }
+        return 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
new file mode 100644
index 0000000..c788729
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class BigEndianLongCoder extends AtomicCoder<Long> {
+
+    public static BigEndianLongCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final BigEndianLongCoder INSTANCE = new 
BigEndianLongCoder();
+
+    private BigEndianLongCoder() {}
+
+    @Override
+    public void encode(Long value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Long");
+        }
+        try {
+            new DataOutputStream(outStream).writeLong(value);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public Long decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return new DataInputStream(inStream).readLong();
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Long value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(Long value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Long");
+        }
+        return 8;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
new file mode 100644
index 0000000..4a65992
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigInteger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
+
+    public static BigIntegerCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final BigIntegerCoder INSTANCE = new BigIntegerCoder();
+    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+    private BigIntegerCoder() {
+    }
+
+    @Override
+    public void encode(BigInteger value, OutputStream outStream)
+            throws CoderException {
+        checkNotNull(value, String.format("cannot encode a null %s", 
BigInteger.class.getSimpleName()));
+        BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream);
+    }
+
+    @Override
+    public BigInteger decode(InputStream inStream)
+            throws CoderException {
+        return new BigInteger(BYTE_ARRAY_CODER.decode(inStream));
+    }
+
+    @Override
+    public void verifyDeterministic() {
+        BYTE_ARRAY_CODER.verifyDeterministic();
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(BigInteger value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(BigInteger value) {
+        checkNotNull(value, String.format("cannot encode a null %s", 
BigInteger.class.getSimpleName()));
+        return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
new file mode 100644
index 0000000..119e6eb
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class BufferedElementCountingOutputStream extends OutputStream {
+    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private final ByteBuffer buffer;
+    private final OutputStream os;
+    private boolean finished;
+    private long count;
+
+    public BufferedElementCountingOutputStream(OutputStream os) {
+        this(os, DEFAULT_BUFFER_SIZE);
+    }
+
+    BufferedElementCountingOutputStream(OutputStream os, int bufferSize) {
+        this.buffer = ByteBuffer.allocate(bufferSize);
+        this.os = os;
+        this.finished = false;
+        this.count = 0;
+    }
+
+    public void finish() throws IOException {
+        if (finished) {
+            return;
+        }
+        flush();
+        // Finish the stream by stating that there are 0 elements that follow.
+        VarInt.encode(0, os);
+        finished = true;
+    }
+
+    public void markElementStart() throws IOException {
+        if (finished) {
+            throw new IOException("Stream has been finished. Can not add any 
more elements.");
+        }
+        count++;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (finished) {
+            throw new IOException("Stream has been finished. Can not write any 
more data.");
+        }
+        if (count == 0) {
+            os.write(b);
+            return;
+        }
+
+        if (buffer.hasRemaining()) {
+            buffer.put((byte) b);
+        } else {
+            outputBuffer();
+            os.write(b);
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (finished) {
+            throw new IOException("Stream has been finished. Can not write any 
more data.");
+        }
+        if (count == 0) {
+            os.write(b, off, len);
+            return;
+        }
+
+        if (buffer.remaining() >= len) {
+            buffer.put(b, off, len);
+        } else {
+            outputBuffer();
+            os.write(b, off, len);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (finished) {
+            return;
+        }
+        outputBuffer();
+        os.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        finish();
+        os.close();
+    }
+
+    // Output the buffer if it contains any data.
+    private void outputBuffer() throws IOException {
+        if (count > 0) {
+            VarInt.encode(count, os);
+            // We are using a heap based buffer and not a direct buffer so it 
is safe to access
+            // the underlying array.
+            os.write(buffer.array(), buffer.arrayOffset(), buffer.position());
+            buffer.clear();
+            // The buffer has been flushed so we must write to the underlying 
stream until
+            // we learn of the next element. We reset the count to zero 
marking that we should
+            // not use the buffer.
+            count = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
new file mode 100644
index 0000000..6b1af05
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.ByteStreams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
+
+    public static ByteArrayCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
+
+    private ByteArrayCoder() {
+    }
+
+    @Override
+    public void encode(byte[] value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null byte[]");
+        }
+
+        try {
+            VarInt.encode(value.length, outStream);
+            outStream.write(value);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public byte[] decode(InputStream inStream)
+            throws CoderException {
+        byte[] value = null;
+        try {
+            int length = VarInt.decodeInt(inStream);
+            if (length < 0) {
+                throw new CoderException("invalid length " + length);
+            }
+            value = new byte[length];
+
+            ByteStreams.readFully(inStream, value);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+        return value;
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+
+    @Override
+    public Object structuralValue(byte[] value) {
+        return new StructuralByteArray(value);
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(byte[] value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(byte[] value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null byte[]");
+        }
+        return VarInt.getLength(value.length) + value.length;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
new file mode 100644
index 0000000..e3cb7e4
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class ByteCoder extends AtomicCoder<Byte> {
+
+    public static ByteCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final ByteCoder INSTANCE = new ByteCoder();
+
+    private ByteCoder() {
+    }
+
+    @Override
+    public void encode(Byte value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Byte");
+        }
+        try {
+            outStream.write(value.byteValue());
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public Byte decode(InputStream inStream) throws CoderException {
+        try {
+            // value will be between 0-255, -1 for EOF
+            int value = inStream.read();
+            if (value == -1) {
+                throw new EOFException("EOF encountered decoding 1 byte from 
input stream");
+            }
+            return (byte) value;
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Byte value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(Byte value) {
+        if (value == null) {
+            throw new CoderException("cannot estimate size for unsupported 
null value");
+        }
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
new file mode 100644
index 0000000..e1999ed
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class Coder<T> implements Serializable {
+
+    public abstract void encode(T value, OutputStream outStream)
+            throws CoderException;
+
+    public abstract T decode(InputStream inStream) throws CoderException;
+
+    public abstract List<? extends Coder<?>> getCoderArguments();
+
+    public abstract void verifyDeterministic() throws 
Coder.NonDeterministicException;
+
+    public static void verifyDeterministic(Coder<?> target, String message, 
Iterable<Coder<?>> coders)
+            throws NonDeterministicException {
+        for (Coder<?> coder : coders) {
+            try {
+                coder.verifyDeterministic();
+            } catch (NonDeterministicException e) {
+                throw new NonDeterministicException(target, message, e);
+            }
+        }
+    }
+
+    public static void verifyDeterministic(Coder<?> target, String message, 
Coder<?>... coders)
+            throws NonDeterministicException {
+        verifyDeterministic(target, message, Arrays.asList(coders));
+    }
+
+    public boolean consistentWithEquals() {
+        return false;
+    }
+
+    public Object structuralValue(T value) {
+        if (value != null && consistentWithEquals()) {
+            return value;
+        } else {
+            try {
+                ByteArrayOutputStream os = new ByteArrayOutputStream();
+                encode(value, os);
+                return new StructuralByteArray(os.toByteArray());
+            } catch (Exception exn) {
+                throw new IllegalArgumentException(
+                        "Unable to encode element '" + value + "' with coder 
'" + this + "'.", exn);
+            }
+        }
+    }
+
+    public boolean isRegisterByteSizeObserverCheap(T value) {
+        return false;
+    }
+
+    public void registerByteSizeObserver(T value, ElementByteSizeObserver 
observer) {
+        observer.update(getEncodedElementByteSize(value));
+    }
+
+    protected long getEncodedElementByteSize(T value) {
+        try (CountingOutputStream os = new 
CountingOutputStream(ByteStreams.nullOutputStream())) {
+            encode(value, os);
+            return os.getCount();
+        } catch (Exception exn) {
+            throw new IllegalArgumentException(
+                    "Unable to encode element '" + value + "' with coder '" + 
this + "'.", exn);
+        }
+    }
+
+    public static class NonDeterministicException extends RuntimeException {
+        private Coder<?> coder;
+        private List<String> reasons;
+
+        public NonDeterministicException(
+                Coder<?> coder, String reason, NonDeterministicException e) {
+            this(coder, Arrays.asList(reason), e);
+        }
+
+        public NonDeterministicException(Coder<?> coder, String reason) {
+            this(coder, Arrays.asList(reason), null);
+        }
+
+        public NonDeterministicException(Coder<?> coder, List<String> reasons) 
{
+            this(coder, reasons, null);
+        }
+
+        public NonDeterministicException(
+                Coder<?> coder,
+                List<String> reasons,
+                NonDeterministicException cause) {
+            super(cause);
+            checkArgument(reasons.size() > 0, "Reasons must not be empty.");
+            this.reasons = reasons;
+            this.coder = coder;
+        }
+
+        public Iterable<String> getReasons() {
+            return reasons;
+        }
+
+        @Override
+        public String getMessage() {
+            return String.format("%s is not deterministic because:%n  %s",
+                    coder, Joiner.on("%n  ").join(reasons));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
new file mode 100644
index 0000000..8213e42
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.coder;
+
+public class CoderException extends RuntimeException {
+    public CoderException(String message) {
+        super(message);
+    }
+
+    public CoderException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CoderException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
new file mode 100644
index 0000000..2126c48
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.BaseEncoding;
+
+import java.io.*;
+import java.lang.ref.SoftReference;
+
+public final class CoderUtils {
+    private CoderUtils() {
+    }  // Non-instantiable
+
+    private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
+            threadLocalOutputStream = new ThreadLocal<>();
+
+    private static ThreadLocal<Boolean> threadLocalOutputStreamInUse = new 
ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return false;
+        }
+    };
+
+    public static <T> byte[] encodeToByteArray(Coder<T> coder, T value)
+            throws CoderException {
+        if (threadLocalOutputStreamInUse.get()) {
+            // encodeToByteArray() is called recursively and the thread local 
stream is in use,
+            // allocating a new one.
+            ByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
+            encodeToSafeStream(coder, value, stream);
+            return stream.toByteArray();
+        } else {
+            threadLocalOutputStreamInUse.set(true);
+            try {
+                ByteArrayOutputStream stream = getThreadLocalOutputStream();
+                encodeToSafeStream(coder, value, stream);
+                return stream.toByteArray();
+            } finally {
+                threadLocalOutputStreamInUse.set(false);
+            }
+        }
+    }
+
+    private static <T> void encodeToSafeStream(
+            Coder<T> coder, T value, OutputStream stream) throws 
CoderException {
+        try {
+            coder.encode(value, new UnownedOutputStream(stream));
+        } catch (CoderException exn) {
+            throw new IllegalArgumentException(
+                    "Forbidden IOException when writing to OutputStream", exn);
+        }
+    }
+
+    public static <T> T decodeFromByteArray(
+            Coder<T> coder, byte[] encodedValue) throws CoderException {
+        try (ExposedByteArrayInputStream stream = new 
ExposedByteArrayInputStream(encodedValue)) {
+            T result = decodeFromSafeStream(coder, stream);
+            if (stream.available() != 0) {
+                throw new CoderException(
+                        stream.available() + " unexpected extra bytes after 
decoding " + result);
+            }
+            return result;
+        }
+    }
+
+    private static <T> T decodeFromSafeStream(
+            Coder<T> coder, InputStream stream) throws CoderException {
+        try {
+            return coder.decode(new UnownedInputStream(stream));
+        } catch (CoderException exn) {
+            throw new IllegalArgumentException(
+                    "Forbidden IOException when reading from InputStream", 
exn);
+        }
+    }
+
+    private static ByteArrayOutputStream getThreadLocalOutputStream() {
+        SoftReference<ExposedByteArrayOutputStream> refStream = 
threadLocalOutputStream.get();
+        ExposedByteArrayOutputStream stream = refStream == null ? null : 
refStream.get();
+        if (stream == null) {
+            stream = new ExposedByteArrayOutputStream();
+            threadLocalOutputStream.set(new SoftReference<>(stream));
+        }
+        stream.reset();
+        return stream;
+    }
+
+    public static <T> T clone(Coder<T> coder, T value) throws CoderException {
+        return decodeFromByteArray(coder, encodeToByteArray(coder, value));
+    }
+
+    public static <T> String encodeToBase64(Coder<T> coder, T value)
+            throws CoderException {
+        byte[] rawValue = encodeToByteArray(coder, value);
+        return BaseEncoding.base64Url().omitPadding().encode(rawValue);
+    }
+
+    public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) 
throws CoderException {
+        return decodeFromSafeStream(
+                coder,
+                new 
ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue)));
+    }
+
+    public static class ExposedByteArrayOutputStream extends 
ByteArrayOutputStream {
+
+        private byte[] swappedBuffer;
+
+        private boolean isFallback = false;
+
+        private void fallback() {
+            isFallback = true;
+            if (swappedBuffer != null) {
+                // swappedBuffer != null means buf is actually provided by the 
caller of writeAndOwn(),
+                // while swappedBuffer is the original buffer.
+                // Recover the buffer and copy the bytes from buf.
+                byte[] tempBuffer = buf;
+                count = 0;
+                buf = swappedBuffer;
+                super.write(tempBuffer, 0, tempBuffer.length);
+                swappedBuffer = null;
+            }
+        }
+
+        public void writeAndOwn(byte[] b) throws IOException {
+            if (b.length == 0) {
+                return;
+            }
+            if (count == 0) {
+                // Optimized first-time whole write.
+                // The original buffer will be swapped to swappedBuffer, while 
the input b is used as buf.
+                swappedBuffer = buf;
+                buf = b;
+                count = b.length;
+            } else {
+                fallback();
+                super.write(b);
+            }
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) {
+            fallback();
+            super.write(b, off, len);
+        }
+
+        @Override
+        public void write(int b) {
+            fallback();
+            super.write(b);
+        }
+
+        @Override
+        public byte[] toByteArray() {
+            // Note: count == buf.length is not a correct criteria to "return 
buf;", because the internal
+            // buf may be reused after reset().
+            if (!isFallback && count > 0) {
+                return buf;
+            } else {
+                return super.toByteArray();
+            }
+        }
+
+        @Override
+        public void reset() {
+            if (count == 0) {
+                return;
+            }
+            count = 0;
+            if (isFallback) {
+                isFallback = false;
+            } else {
+                buf = swappedBuffer;
+                swappedBuffer = null;
+            }
+        }
+    }
+
+    public static class ExposedByteArrayInputStream extends 
ByteArrayInputStream {
+
+        public ExposedByteArrayInputStream(byte[] buf) {
+            super(buf);
+        }
+
+        public byte[] readAll() throws IOException {
+            if (pos == 0 && count == buf.length) {
+                pos = count;
+                return buf;
+            }
+            byte[] ret = new byte[count - pos];
+            super.read(ret);
+            return ret;
+        }
+
+        @Override
+        public void close() {
+            try {
+                super.close();
+            } catch (IOException exn) {
+                throw new RuntimeException("Unexpected IOException closing 
ByteArrayInputStream", exn);
+            }
+        }
+    }
+
+    public static class UnownedOutputStream extends FilterOutputStream {
+        public UnownedOutputStream(OutputStream delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new UnsupportedOperationException("Caller does not own the 
underlying output stream "
+                    + " and should not call close().");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return obj instanceof UnownedOutputStream
+                    && ((UnownedOutputStream) obj).out.equals(out);
+        }
+
+        @Override
+        public int hashCode() {
+            return out.hashCode();
+        }
+
+    }
+
+    public static class UnownedInputStream extends FilterInputStream {
+        public UnownedInputStream(InputStream delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new UnsupportedOperationException("Caller does not own the 
underlying input stream "
+                    + " and should not call close().");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return obj instanceof UnownedInputStream
+                    && ((UnownedInputStream) obj).in.equals(in);
+        }
+
+        @Override
+        public int hashCode() {
+            return in.hashCode();
+        }
+
+        @SuppressWarnings("UnsynchronizedOverridesSynchronized")
+        @Override
+        public void mark(int readlimit) {
+            throw new UnsupportedOperationException("Caller does not own the 
underlying input stream "
+                    + " and should not call mark().");
+        }
+
+        @Override
+        public boolean markSupported() {
+            return false;
+        }
+
+        @SuppressWarnings("UnsynchronizedOverridesSynchronized")
+        @Override
+        public void reset() throws IOException {
+            throw new UnsupportedOperationException("Caller does not own the 
underlying input stream "
+                    + " and should not call reset().");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
new file mode 100644
index 0000000..981bee2
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class DoubleCoder extends AtomicCoder<Double> {
+
+    public static DoubleCoder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final DoubleCoder INSTANCE = new DoubleCoder();
+
+    private DoubleCoder() {
+    }
+
+    @Override
+    public void encode(Double value, OutputStream outStream)
+            throws CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Double");
+        }
+        try {
+            new DataOutputStream(outStream).writeDouble(value);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public Double decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return new DataInputStream(inStream).readDouble();
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {
+        throw new NonDeterministicException(this,
+                "Floating point encodings are not guaranteed to be 
deterministic.");
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(Double value) {
+        return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(Double value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null Double");
+        }
+        return 8;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
new file mode 100644
index 0000000..29b4aa5
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Observer;
+
+public abstract class ElementByteSizeObservableIterable<
+        V, InputT extends ElementByteSizeObservableIterator<V>>
+        implements Iterable<V> {
+    private List<Observer> observers = new ArrayList<>();
+
+    protected abstract InputT createIterator();
+
+    public void addObserver(Observer observer) {
+        observers.add(observer);
+    }
+
+    @Override
+    public InputT iterator() {
+        InputT iterator = createIterator();
+        for (Observer observer : observers) {
+            iterator.addObserver(observer);
+        }
+        observers.clear();
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
new file mode 100644
index 0000000..946882b
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Iterator;
+import java.util.Observable;
+
+public abstract class ElementByteSizeObservableIterator<V>
+        extends Observable implements Iterator<V> {
+    protected final void notifyValueReturned(long byteSize) {
+        setChanged();
+        notifyObservers(byteSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
new file mode 100644
index 0000000..5464067
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Observable;
+import java.util.Observer;
+
+public abstract class ElementByteSizeObserver implements Observer {
+    private boolean isLazy = false;
+    private long totalSize = 0;
+    private double scalingFactor = 1.0;
+
+    public ElementByteSizeObserver() {
+    }
+
+    protected abstract void reportElementSize(long elementByteSize);
+
+    public void setLazy() {
+        isLazy = true;
+    }
+
+    public boolean getIsLazy() {
+        return isLazy;
+    }
+
+    public void update(Object obj) {
+        update(null, obj);
+    }
+
+    public void setScalingFactor(double scalingFactor) {
+        this.scalingFactor = scalingFactor;
+    }
+
+    @Override
+    public void update(Observable obs, Object obj) {
+        if (obj instanceof Long) {
+            totalSize += scalingFactor * (Long) obj;
+        } else if (obj instanceof Integer) {
+            totalSize += scalingFactor * (Integer) obj;
+        } else {
+            throw new AssertionError("unexpected parameter object");
+        }
+    }
+
+    public void advance() {
+        reportElementSize(totalSize);
+        totalSize = 0;
+        isLazy = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
new file mode 100644
index 0000000..d069068
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.List;
+
+public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
+
+    public static <T> IterableCoder<T> of(Coder<T> elemCoder) {
+        return new IterableCoder<>(elemCoder);
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+    // Internal operations below here.
+
+    @Override
+    protected final Iterable<T> decodeToIterable(List<T> decodedElements) {
+        return decodedElements;
+    }
+
+    protected IterableCoder(Coder<T> elemCoder) {
+        super(elemCoder, "Iterable");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
new file mode 100644
index 0000000..5bb6c66
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
+        extends StructuredCoder<IterableT> {
+    public Coder<T> getElemCoder() {
+        return elementCoder;
+    }
+
+    protected abstract IterableT decodeToIterable(List<T> decodedElements);
+
+    
/////////////////////////////////////////////////////////////////////////////
+    // Internal operations below here.
+
+    private final Coder<T> elementCoder;
+    private final String iterableName;
+
+    protected IterableLikeCoder(Coder<T> elementCoder, String  iterableName) {
+        checkArgument(elementCoder != null, "element Coder for 
IterableLikeCoder must not be null");
+        checkArgument(iterableName != null, "iterable name for 
IterableLikeCoder must not be null");
+        this.elementCoder = elementCoder;
+        this.iterableName = iterableName;
+    }
+
+    @Override
+    public void encode(IterableT iterable, OutputStream outStream) {
+        if (iterable == null) {
+            throw new CoderException("cannot encode a null " + iterableName);
+        }
+        DataOutputStream dataOutStream = new DataOutputStream(outStream);
+        try {
+            if (iterable instanceof Collection) {
+                // We can know the size of the Iterable.  Use an encoding with 
a
+                // leading size field, followed by that many elements.
+                Collection<T> collection = (Collection<T>) iterable;
+                dataOutStream.writeInt(collection.size());
+                for (T elem : collection) {
+                    elementCoder.encode(elem, dataOutStream);
+                }
+            } else {
+                // We don't know the size without traversing it so use a fixed 
size buffer
+                // and encode as many elements as possible into it before 
outputting the size followed
+                // by the elements.
+                dataOutStream.writeInt(-1);
+                BufferedElementCountingOutputStream countingOutputStream =
+                        new BufferedElementCountingOutputStream(dataOutStream);
+                for (T elem : iterable) {
+                    countingOutputStream.markElementStart();
+                    elementCoder.encode(elem, countingOutputStream);
+                }
+                countingOutputStream.finish();
+            }
+            // Make sure all our output gets pushed to the underlying 
outStream.
+            dataOutStream.flush();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public IterableT decode(InputStream inStream) {
+        try {
+            DataInputStream dataInStream = new DataInputStream(inStream);
+            int size = dataInStream.readInt();
+            if (size >= 0) {
+                List<T> elements = new ArrayList<>(size);
+                for (int i = 0; i < size; i++) {
+                    elements.add(elementCoder.decode(dataInStream));
+                }
+                return decodeToIterable(elements);
+            }
+            List<T> elements = new ArrayList<>();
+            // We don't know the size a priori.  Check if we're done with
+            // each block of elements.
+            long count = VarInt.decodeLong(dataInStream);
+            while (count > 0L) {
+                elements.add(elementCoder.decode(dataInStream));
+                --count;
+                if (count == 0L) {
+                    count = VarInt.decodeLong(dataInStream);
+                }
+            }
+            return decodeToIterable(elements);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Arrays.asList(elementCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws Coder.NonDeterministicException {
+        throw new NonDeterministicException(this,
+                "IterableLikeCoder can not guarantee deterministic ordering.");
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(
+            IterableT iterable) {
+        return iterable instanceof ElementByteSizeObservableIterable;
+    }
+
+    @Override
+    public void registerByteSizeObserver(
+            IterableT iterable, ElementByteSizeObserver observer) {
+        if (iterable == null) {
+            throw new CoderException("cannot encode a null Iterable");
+        }
+
+        if (iterable instanceof ElementByteSizeObservableIterable) {
+            observer.setLazy();
+            ElementByteSizeObservableIterable<?, ?> observableIterable =
+                    (ElementByteSizeObservableIterable<?, ?>) iterable;
+            observableIterable.addObserver(
+                    new IteratorObserver(observer, iterable instanceof 
Collection));
+        } else {
+            if (iterable instanceof Collection) {
+                // We can know the size of the Iterable.  Use an encoding with 
a
+                // leading size field, followed by that many elements.
+                Collection<T> collection = (Collection<T>) iterable;
+                observer.update(4L);
+                for (T elem : collection) {
+                    elementCoder.registerByteSizeObserver(elem, observer);
+                }
+            } else {
+                // TODO: (BEAM-1537) Update to use an accurate count depending 
on size and count,
+                // currently we are under estimating the size by up to 10 
bytes per block of data since we
+                // are not encoding the count prefix which occurs at most once 
per 64k of data and is upto
+                // 10 bytes long. Since we include the total count we can 
upper bound the underestimate
+                // to be 10 / 65536 ~= 0.0153% of the actual size.
+                observer.update(4L);
+                long count = 0;
+                for (T elem : iterable) {
+                    count += 1;
+                    elementCoder.registerByteSizeObserver(elem, observer);
+                }
+                if (count > 0) {
+                    // Update the length based upon the number of counted 
elements, this helps
+                    // eliminate the case where all the elements are encoded 
in the first block and
+                    // it is quite short (e.g. Long.MAX_VALUE nulls encoded 
with VoidCoder).
+                    observer.update(VarInt.getLength(count));
+                }
+                // Update with the terminator byte.
+                observer.update(1L);
+            }
+        }
+    }
+
+    private class IteratorObserver implements Observer {
+        private final ElementByteSizeObserver outerObserver;
+        private final boolean countable;
+
+        public IteratorObserver(ElementByteSizeObserver outerObserver,
+                                boolean countable) {
+            this.outerObserver = outerObserver;
+            this.countable = countable;
+
+            if (countable) {
+                // Additional 4 bytes are due to size.
+                outerObserver.update(4L);
+            } else {
+                // Additional 5 bytes are due to size = -1 (4 bytes) and
+                // hasNext = false (1 byte).
+                outerObserver.update(5L);
+            }
+        }
+
+        @Override
+        public void update(Observable obs, Object obj) {
+            if (!(obj instanceof Long)) {
+                throw new AssertionError("unexpected parameter object");
+            }
+
+            if (countable) {
+                outerObserver.update(obs, obj);
+            } else {
+                // Additional 1 byte is due to hasNext = true flag.
+                outerObserver.update(obs, 1 + (long) obj);
+            }
+        }
+    }
+
+    public static abstract class ElementByteSizeObservableIterable<
+            V, InputT extends ElementByteSizeObservableIterator<V>>
+            implements Iterable<V> {
+        private List<Observer> observers = new ArrayList<>();
+
+        protected abstract InputT createIterator();
+
+        public void addObserver(Observer observer) {
+            observers.add(observer);
+        }
+
+        @Override
+        public InputT iterator() {
+            InputT iterator = createIterator();
+            for (Observer observer : observers) {
+                iterator.addObserver(observer);
+            }
+            observers.clear();
+            return iterator;
+        }
+    }
+
+    public static abstract class ElementByteSizeObservableIterator<V>
+            extends Observable implements Iterator<V> {
+        protected final void notifyValueReturned(long byteSize) {
+            setChanged();
+            notifyObservers(byteSize);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
new file mode 100644
index 0000000..f1be5fb
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Observable;
+import java.util.Observer;
+
+public class IteratorObserver implements Observer {
+    private final ElementByteSizeObserver outerObserver;
+    private final boolean countable;
+
+    public IteratorObserver(ElementByteSizeObserver outerObserver,
+                            boolean countable) {
+        this.outerObserver = outerObserver;
+        this.countable = countable;
+
+        if (countable) {
+            // Additional 4 bytes are due to size.
+            outerObserver.update(4L);
+        } else {
+            // Additional 5 bytes are due to size = -1 (4 bytes) and
+            // hasNext = false (1 byte).
+            outerObserver.update(5L);
+        }
+    }
+
+    @Override
+    public void update(Observable obs, Object obj) {
+        if (!(obj instanceof Long)) {
+            throw new AssertionError("unexpected parameter object");
+        }
+
+        if (countable) {
+            outerObserver.update(obs, obj);
+        } else {
+            // Additional 1 byte is due to hasNext = true flag.
+            outerObserver.update(obs, 1 + (long) obj);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
new file mode 100644
index 0000000..3858ec6
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.List;
+
+public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
+
+    public static <T> ListCoder<T> of(Coder<T> elemCoder) {
+        return new ListCoder<>(elemCoder);
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+    // Internal operations below here.
+
+    @Override
+    protected final List<T> decodeToIterable(List<T> decodedElements) {
+        return decodedElements;
+    }
+
+    protected ListCoder(Coder<T> elemCoder) {
+        super(elemCoder, "List");
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+        verifyDeterministic(this, "ListCoder.elemCoder must be deterministic",
+                (Iterable<Coder<?>>)getElemCoder());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
new file mode 100644
index 0000000..66b983c
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.collect.Maps;
+
+import java.io.*;
+import java.util.*;
+
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
+
+    public static <K, V> MapCoder<K, V> of(
+            Coder<K> keyCoder,
+            Coder<V> valueCoder) {
+        return new MapCoder<>(keyCoder, valueCoder);
+    }
+
+    public Coder<K> getKeyCoder() {
+        return keyCoder;
+    }
+
+    public Coder<V> getValueCoder() {
+        return valueCoder;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private Coder<K> keyCoder;
+    private Coder<V> valueCoder;
+
+    private MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+        this.keyCoder = keyCoder;
+        this.valueCoder = valueCoder;
+    }
+
+    @Override
+    public void encode(Map<K, V> map, OutputStream outStream) throws 
CoderException  {
+        if (map == null) {
+            throw new CoderException("cannot encode a null Map");
+        }
+        DataOutputStream dataOutStream = new DataOutputStream(outStream);
+
+        int size = map.size();
+        try {
+            dataOutStream.writeInt(size);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (size == 0) {
+            return;
+        }
+
+        // Since we handled size == 0 above, entry is guaranteed to exist 
before and after loop
+        Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
+        Map.Entry<K, V> entry = iterator.next();
+        while (iterator.hasNext()) {
+            keyCoder.encode(entry.getKey(), outStream);
+            valueCoder.encode(entry.getValue(), outStream);
+            entry = iterator.next();
+        }
+
+        keyCoder.encode(entry.getKey(), outStream);
+        valueCoder.encode(entry.getValue(), outStream);
+        // no flush needed as DataOutputStream does not buffer
+    }
+
+    @Override
+    public Map<K, V> decode(InputStream inStream) {
+        DataInputStream dataInStream = new DataInputStream(inStream);
+        int size = 0;
+        try {
+            size = dataInStream.readInt();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (size == 0) {
+            return Collections.emptyMap();
+        }
+
+        Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
+        for (int i = 0; i < size - 1; ++i) {
+            K key = keyCoder.decode(inStream);
+            V value = valueCoder.decode(inStream);
+            retval.put(key, value);
+        }
+
+        K key = keyCoder.decode(inStream);
+        V value = valueCoder.decode(inStream);
+        retval.put(key, value);
+        return retval;
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Arrays.asList(keyCoder, valueCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+        throw new NonDeterministicException(this,
+                "Ordering of entries in a Map may be non-deterministic.");
+    }
+
+    @Override
+    public void registerByteSizeObserver(
+            Map<K, V> map, ElementByteSizeObserver observer) {
+        observer.update(4L);
+        if (map.isEmpty()) {
+            return;
+        }
+        Iterator<Map.Entry<K, V>> entries = map.entrySet().iterator();
+        Map.Entry<K, V> entry = entries.next();
+        while (entries.hasNext()) {
+            keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+            valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+            entry = entries.next();
+        }
+        keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+        valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
new file mode 100644
index 0000000..4147732
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
+
+    public static <T> SetCoder<T> of(Coder<T> elementCoder) {
+        return new SetCoder<>(elementCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+        throw new NonDeterministicException(this,
+                "Ordering of elements in a set may be non-deterministic.");
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+    // Internal operations below here.
+
+    @Override
+    protected final Set<T> decodeToIterable(List<T> decodedElements) {
+        return new HashSet<>(decodedElements);
+    }
+
+    protected SetCoder(Coder<T> elemCoder) {
+        super(elemCoder, "Set");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
new file mode 100644
index 0000000..73ea8eb
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.base.Utf8;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+public class StringUtf8Coder extends AtomicCoder<String> {
+
+    public static StringUtf8Coder of() {
+        return INSTANCE;
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////
+
+    private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
+
+    private static void writeString(String value, DataOutputStream dos)
+            throws IOException {
+        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+        VarInt.encode(bytes.length, dos);
+        dos.write(bytes);
+    }
+
+    private static String readString(DataInputStream dis) throws IOException {
+        int len = VarInt.decodeInt(dis);
+        if (len < 0) {
+            throw new CoderException("Invalid encoded string length: " + len);
+        }
+        byte[] bytes = new byte[len];
+        dis.readFully(bytes);
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+
+    private StringUtf8Coder() {
+    }
+
+    @Override
+    public void encode(String value, OutputStream outStream) throws 
CoderException {
+        if (value == null) {
+            throw new CoderException("cannot encode a null String");
+        }
+        try {
+            writeString(value, new DataOutputStream(outStream));
+        } catch (IOException e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public String decode(InputStream inStream)
+            throws CoderException {
+        try {
+            return readString(new DataInputStream(inStream));
+        } catch (EOFException | UTFDataFormatException exn) {
+            // These exceptions correspond to decoding problems, so change
+            // what kind of exception they're branded as.
+            throw new CoderException(exn);
+        } catch (Exception e) {
+            throw new CoderException(e);
+        }
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+        return true;
+    }
+
+    @Override
+    public long getEncodedElementByteSize(String value) {
+        if (value == null) {
+            throw new CoderException("cannot encode a null String");
+        }
+        int size = Utf8.encodedLength(value);
+        return VarInt.getLength(size) + size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
new file mode 100644
index 0000000..6a371f6
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.BaseEncoding;
+
+import java.util.Arrays;
+
+public class StructuralByteArray {
+    byte[] value;
+
+    public StructuralByteArray(byte[] value) {
+        this.value = value;
+    }
+
+    public byte[] getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof StructuralByteArray) {
+            StructuralByteArray that = (StructuralByteArray) o;
+            return Arrays.equals(this.value, that.value);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(value);
+    }
+
+    @Override
+    public String toString() {
+        return "base64:" + BaseEncoding.base64().encode(value);
+    }
+}

Reply via email to