Repository: flink
Updated Branches:
  refs/heads/master 7398fdbfe -> 71d2e3ef1


[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer

This also renames MultiplexingStreamRecordSerializer to
StreamElementSerializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71d2e3ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71d2e3ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71d2e3ef

Branch: refs/heads/master
Commit: 71d2e3ef1e42174822709aa8217088f2a489975a
Parents: 7398fdb
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 18 18:32:17 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 18:10:48 2016 +0200

----------------------------------------------------------------------
 .../flink/storm/wrappers/BoltWrapperTest.java   |   4 +-
 .../operator/AbstractCEPPatternOperator.java    |   6 +-
 .../AbstractKeyedCEPPatternOperator.java        |   4 +-
 .../api/datastream/AllWindowedStream.java       |  26 +-
 .../api/datastream/WindowedStream.java          |  26 +-
 .../runtime/io/RecordWriterOutput.java          |  15 +-
 .../runtime/io/StreamInputProcessor.java        |  18 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  30 +--
 .../MultiplexingStreamRecordSerializer.java     | 252 ------------------
 .../streamrecord/StreamElementSerializer.java   | 254 +++++++++++++++++++
 .../streamrecord/StreamRecordSerializer.java    | 149 -----------
 .../runtime/tasks/OneInputStreamTask.java       |   3 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   7 +-
 .../streaming/runtime/tasks/StreamTask.java     |   8 -
 .../runtime/tasks/TwoInputStreamTask.java       |   3 +-
 .../consumer/StreamTestSingleInputGate.java     |   4 +-
 .../windowing/EvictingWindowOperatorTest.java   |  26 +-
 .../MultiplexingStreamRecordSerializerTest.java | 108 --------
 .../StreamElementSerializerTest.java            | 108 ++++++++
 .../StreamRecordSerializerTest.java             |  86 -------
 .../runtime/tasks/StreamTaskTestHarness.java    |   4 +-
 21 files changed, 451 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e0659da..a7d60a6 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -43,8 +43,8 @@ import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Assert;
 import org.junit.Test;
@@ -68,7 +68,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, 
StreamRecord.class})
+@PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, 
StreamRecord.class})
 @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class BoltWrapperTest extends AbstractTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 10bb6ff..455e864 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -46,7 +46,7 @@ import java.util.PriorityQueue;
 abstract public class AbstractCEPPatternOperator<IN, OUT> extends 
AbstractCEPBasePatternOperator<IN, OUT> {
        private static final long serialVersionUID = 7487334510746595640L;
 
-       private final MultiplexingStreamRecordSerializer<IN> 
streamRecordSerializer;
+       private final StreamElementSerializer<IN> streamRecordSerializer;
 
        // global nfa for all elements
        private NFA<IN> nfa;
@@ -60,7 +60,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
                        NFACompiler.NFAFactory<IN> nfaFactory) {
                super(inputSerializer, isProcessingTime);
 
-               this.streamRecordSerializer = new 
MultiplexingStreamRecordSerializer<>(inputSerializer);
+               this.streamRecordSerializer = new 
StreamElementSerializer<>(inputSerializer);
                this.nfa = nfaFactory.createNFA();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 07e2662..c3898c3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.io.IOException;
@@ -108,7 +108,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
                @SuppressWarnings("unchecked,rawtypes")
                TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
-                               (TypeSerializer) new 
MultiplexingStreamRecordSerializer<>(getInputSerializer());
+                               (TypeSerializer) new 
StreamElementSerializer<>(getInputSerializer());
 
                if (priorityQueueOperatorState == null) {
                        priorityQueueOperatorState = getPartitionedState(

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 6b09f3c..e77b5c8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -49,8 +50,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOper
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
@@ -269,8 +270,12 @@ public class AllWindowedStream<T, W extends Window> {
                WindowOperator<Byte, T, Iterable<T>, R, W> operator;
 
                if (evictor != null) {
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                                       new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
@@ -357,8 +362,12 @@ public class AllWindowedStream<T, W extends Window> {
                OneInputStreamOperator<T, R> operator;
 
                if (evictor != null) {
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                                       new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
@@ -450,9 +459,12 @@ public class AllWindowedStream<T, W extends Window> {
                OneInputStreamOperator<T, R> operator;
 
                if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
 
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                                       new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index ae98619..15ec5f1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -56,8 +57,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOper
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * A {@code WindowedStream} represents a data stream where elements are 
grouped by
@@ -290,8 +291,12 @@ public class WindowedStream<T, K, W extends Window> {
                WindowOperator<K, T, Iterable<T>, R, W> operator;
 
                if (evictor != null) {
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
@@ -378,8 +383,12 @@ public class WindowedStream<T, K, W extends Window> {
                OneInputStreamOperator<T, R> operator;
 
                if (evictor != null) {
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
@@ -471,9 +480,12 @@ public class WindowedStream<T, K, W extends Window> {
                OneInputStreamOperator<T, R> operator;
 
                if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
 
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 9f046f6..c3ef464 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -28,9 +28,8 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,8 +47,7 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        @SuppressWarnings("unchecked")
        public RecordWriterOutput(
                        
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
-                       TypeSerializer<OUT> outSerializer,
-                       boolean enableMultiplexing) {
+                       TypeSerializer<OUT> outSerializer) {
 
                checkNotNull(recordWriter);
                
@@ -58,13 +56,8 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                this.recordWriter = 
(StreamRecordWriter<SerializationDelegate<StreamElement>>) 
                                (StreamRecordWriter<?>) recordWriter;
 
-               TypeSerializer<StreamElement> outRecordSerializer;
-               if (enableMultiplexing) {
-                       outRecordSerializer = new 
MultiplexingStreamRecordSerializer<OUT>(outSerializer);
-               } else {
-                       outRecordSerializer = (TypeSerializer<StreamElement>)
-                                       (TypeSerializer<?>) new 
StreamRecordSerializer<OUT>(outSerializer);
-               }
+               TypeSerializer<StreamElement> outRecordSerializer =
+                               new StreamElementSerializer<>(outSerializer);
 
                if (outSerializer != null) {
                        serializationDelegate = new 
SerializationDelegate<StreamElement>(outRecordSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 47e55dc..92b1556 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -42,9 +42,8 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -88,8 +87,7 @@ public class StreamInputProcessor<IN> {
                        TypeSerializer<IN> inputSerializer,
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
-                       IOManager ioManager,
-                       boolean enableMultiplexing) throws IOException {
+                       IOManager ioManager) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -107,15 +105,9 @@ public class StreamInputProcessor<IN> {
                        
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
                }
                
-               if (enableMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(ser);
-               } else {
-                       StreamRecordSerializer<IN> ser = new 
StreamRecordSerializer<IN>(inputSerializer);
-                       this.deserializationDelegate = 
(NonReusingDeserializationDelegate<StreamElement>)
-                                       (NonReusingDeserializationDelegate<?>) 
new NonReusingDeserializationDelegate<>(ser);
-               }
-               
+               StreamElementSerializer<IN> ser = new 
StreamElementSerializer<>(inputSerializer);
+               this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(ser);
+
                // Initialize one deserializer per input channel
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
                

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a25c1a1..660f07e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -39,8 +39,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,8 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        TypeSerializer<IN2> inputSerializer2,
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
-                       IOManager ioManager,
-                       boolean enableMultiplexing) throws IOException {
+                       IOManager ioManager) throws IOException {
                
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -114,25 +112,11 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
                }
                
-               if (enableMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN1> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer1);
-                       this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate<>(ser);
-               }
-               else {
-                       StreamRecordSerializer<IN1> ser = new 
StreamRecordSerializer<>(inputSerializer1);
-                       this.deserializationDelegate1 = 
(DeserializationDelegate<StreamElement>)
-                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<>(ser);
-               }
-               
-               if (enableMultiplexing) {
-                       MultiplexingStreamRecordSerializer<IN2> ser = new 
MultiplexingStreamRecordSerializer<>(inputSerializer2);
-                       this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate<>(ser);
-               }
-               else {
-                       StreamRecordSerializer<IN2> ser = new 
StreamRecordSerializer<>(inputSerializer2);
-                       this.deserializationDelegate2 = 
(DeserializationDelegate<StreamElement>)
-                                       (DeserializationDelegate<?>) new 
NonReusingDeserializationDelegate<>(ser);
-               }
+               StreamElementSerializer<IN1> ser1 = new 
StreamElementSerializer<>(inputSerializer1);
+               this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate<>(ser1);
+
+               StreamElementSerializer<IN2> ser2 = new 
StreamElementSerializer<>(inputSerializer2);
+               this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate<>(ser2);
 
                // Initialize one deserializer per input channel
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index 95e3ebd..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Serializer for {@link StreamRecord} and {@link Watermark}. This does not 
behave like a normal
- * {@link TypeSerializer}, instead, this is only used at the stream 
task/operator level for
- * transmitting StreamRecords and Watermarks.
- *
- * @param <T> The type of value in the StreamRecord
- */
-@Internal
-public final class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<StreamElement> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private static final int TAG_REC_WITH_TIMESTAMP = 0;
-       private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
-       private static final int TAG_WATERMARK = 2;
-       private static final int TAG_LATENCY_MARKER = 3;
-       
-       
-       private final TypeSerializer<T> typeSerializer;
-
-       
-       public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) 
{
-               if (serializer instanceof MultiplexingStreamRecordSerializer || 
serializer instanceof StreamRecordSerializer) {
-                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-               }
-               this.typeSerializer = requireNonNull(serializer);
-       }
-
-       public TypeSerializer<T> getContainedTypeSerializer() {
-               return this.typeSerializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public MultiplexingStreamRecordSerializer<T> duplicate() {
-               TypeSerializer<T> copy = typeSerializer.duplicate();
-               return (copy == typeSerializer) ? this : new 
MultiplexingStreamRecordSerializer<T>(copy);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public StreamRecord<T> createInstance() {
-               return new StreamRecord<T>(typeSerializer.createInstance());
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public StreamElement copy(StreamElement from) {
-               // we can reuse the timestamp since Instant is immutable
-               if (from.isRecord()) {
-                       StreamRecord<T> fromRecord = from.asRecord();
-                       return 
fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
-               }
-               else if (from.isWatermark() || from.isLatencyMarker()) {
-                       // is immutable
-                       return from;
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-
-       @Override
-       public StreamElement copy(StreamElement from, StreamElement reuse) {
-               if (from.isRecord() && reuse.isRecord()) {
-                       StreamRecord<T> fromRecord = from.asRecord();
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-
-                       T valueCopy = 
typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
-                       fromRecord.copyTo(valueCopy, reuseRecord);
-                       return reuse;
-               }
-               else if (from.isWatermark() || from.isLatencyMarker()) {
-                       // is immutable
-                       return from;
-               }
-               else {
-                       throw new RuntimeException("Cannot copy " + from + " -> 
" + reuse);
-               }
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               int tag = source.readByte();
-               target.write(tag);
-
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       // move timestamp
-                       target.writeLong(source.readLong());
-                       typeSerializer.copy(source, target);
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       typeSerializer.copy(source, target);
-               }
-               else if (tag == TAG_WATERMARK) {
-                       target.writeLong(source.readLong());
-               }
-               else if (tag == TAG_LATENCY_MARKER) {
-                       target.writeLong(source.readLong());
-                       target.writeInt(source.readInt());
-                       target.writeInt(source.readInt());
-               } else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       @Override
-       public void serialize(StreamElement value, DataOutputView target) 
throws IOException {
-               if (value.isRecord()) {
-                       StreamRecord<T> record = value.asRecord();
-                       
-                       if (record.hasTimestamp()) {
-                               target.write(TAG_REC_WITH_TIMESTAMP);
-                               target.writeLong(record.getTimestamp());
-                       } else {
-                               target.write(TAG_REC_WITHOUT_TIMESTAMP);
-                       }
-                       typeSerializer.serialize(record.getValue(), target);
-               }
-               else if (value.isWatermark()) {
-                       target.write(TAG_WATERMARK);
-                       target.writeLong(value.asWatermark().getTimestamp());
-               }
-               else if (value.isLatencyMarker()) {
-                       target.write(TAG_LATENCY_MARKER);
-                       
target.writeLong(value.asLatencyMarker().getMarkedTime());
-                       target.writeInt(value.asLatencyMarker().getVertexID());
-                       
target.writeInt(value.asLatencyMarker().getSubtaskIndex());
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       @Override
-       public StreamElement deserialize(DataInputView source) throws 
IOException {
-               int tag = source.readByte();
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       long timestamp = source.readLong();
-                       return new 
StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       return new 
StreamRecord<T>(typeSerializer.deserialize(source));
-               }
-               else if (tag == TAG_WATERMARK) {
-                       return new Watermark(source.readLong());
-               }
-               else if (tag == TAG_LATENCY_MARKER) {
-                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
-               }
-               else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       @Override
-       public StreamElement deserialize(StreamElement reuse, DataInputView 
source) throws IOException {
-               int tag = source.readByte();
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       long timestamp = source.readLong();
-                       T value = typeSerializer.deserialize(source);
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-                       reuseRecord.replace(value, timestamp);
-                       return reuseRecord;
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       T value = typeSerializer.deserialize(source);
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-                       reuseRecord.replace(value);
-                       return reuseRecord;
-               }
-               else if (tag == TAG_WATERMARK) {
-                       return new Watermark(source.readLong());
-               }
-               else if (tag == TAG_LATENCY_MARKER) {
-                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
-               }
-               else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof MultiplexingStreamRecordSerializer) {
-                       MultiplexingStreamRecordSerializer<?> other = 
(MultiplexingStreamRecordSerializer<?>) obj;
-
-                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof MultiplexingStreamRecordSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return typeSerializer.hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
new file mode 100644
index 0000000..66d32da
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Serializer for {@link StreamRecord}, {@link Watermark} and {@link 
LatencyMarker}.
+ *
+ * <p>
+ * This does not behave like a normal {@link TypeSerializer}, instead, this is 
only used at the
+ * stream task/operator level for transmitting StreamRecords and Watermarks.
+ *
+ * @param <T> The type of value in the StreamRecord
+ */
+@Internal
+public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamElement> {
+
+       private static final long serialVersionUID = 1L;
+       
+       private static final int TAG_REC_WITH_TIMESTAMP = 0;
+       private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+       private static final int TAG_WATERMARK = 2;
+       private static final int TAG_LATENCY_MARKER = 3;
+       
+       
+       private final TypeSerializer<T> typeSerializer;
+
+       
+       public StreamElementSerializer(TypeSerializer<T> serializer) {
+               if (serializer instanceof StreamElementSerializer) {
+                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+               }
+               this.typeSerializer = requireNonNull(serializer);
+       }
+
+       public TypeSerializer<T> getContainedTypeSerializer() {
+               return this.typeSerializer;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public StreamElementSerializer<T> duplicate() {
+               TypeSerializer<T> copy = typeSerializer.duplicate();
+               return (copy == typeSerializer) ? this : new 
StreamElementSerializer<T>(copy);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public StreamRecord<T> createInstance() {
+               return new StreamRecord<T>(typeSerializer.createInstance());
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public StreamElement copy(StreamElement from) {
+               // we can reuse the timestamp since Instant is immutable
+               if (from.isRecord()) {
+                       StreamRecord<T> fromRecord = from.asRecord();
+                       return 
fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
+               }
+               else if (from.isWatermark() || from.isLatencyMarker()) {
+                       // is immutable
+                       return from;
+               }
+               else {
+                       throw new RuntimeException();
+               }
+       }
+
+       @Override
+       public StreamElement copy(StreamElement from, StreamElement reuse) {
+               if (from.isRecord() && reuse.isRecord()) {
+                       StreamRecord<T> fromRecord = from.asRecord();
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+
+                       T valueCopy = 
typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+                       fromRecord.copyTo(valueCopy, reuseRecord);
+                       return reuse;
+               }
+               else if (from.isWatermark() || from.isLatencyMarker()) {
+                       // is immutable
+                       return from;
+               }
+               else {
+                       throw new RuntimeException("Cannot copy " + from + " -> 
" + reuse);
+               }
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int tag = source.readByte();
+               target.write(tag);
+
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       // move timestamp
+                       target.writeLong(source.readLong());
+                       typeSerializer.copy(source, target);
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       typeSerializer.copy(source, target);
+               }
+               else if (tag == TAG_WATERMARK) {
+                       target.writeLong(source.readLong());
+               }
+               else if (tag == TAG_LATENCY_MARKER) {
+                       target.writeLong(source.readLong());
+                       target.writeInt(source.readInt());
+                       target.writeInt(source.readInt());
+               } else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       @Override
+       public void serialize(StreamElement value, DataOutputView target) 
throws IOException {
+               if (value.isRecord()) {
+                       StreamRecord<T> record = value.asRecord();
+                       
+                       if (record.hasTimestamp()) {
+                               target.write(TAG_REC_WITH_TIMESTAMP);
+                               target.writeLong(record.getTimestamp());
+                       } else {
+                               target.write(TAG_REC_WITHOUT_TIMESTAMP);
+                       }
+                       typeSerializer.serialize(record.getValue(), target);
+               }
+               else if (value.isWatermark()) {
+                       target.write(TAG_WATERMARK);
+                       target.writeLong(value.asWatermark().getTimestamp());
+               }
+               else if (value.isLatencyMarker()) {
+                       target.write(TAG_LATENCY_MARKER);
+                       
target.writeLong(value.asLatencyMarker().getMarkedTime());
+                       target.writeInt(value.asLatencyMarker().getVertexID());
+                       
target.writeInt(value.asLatencyMarker().getSubtaskIndex());
+               }
+               else {
+                       throw new RuntimeException();
+               }
+       }
+       
+       @Override
+       public StreamElement deserialize(DataInputView source) throws 
IOException {
+               int tag = source.readByte();
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       long timestamp = source.readLong();
+                       return new 
StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       return new 
StreamRecord<T>(typeSerializer.deserialize(source));
+               }
+               else if (tag == TAG_WATERMARK) {
+                       return new Watermark(source.readLong());
+               }
+               else if (tag == TAG_LATENCY_MARKER) {
+                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+               }
+               else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       @Override
+       public StreamElement deserialize(StreamElement reuse, DataInputView 
source) throws IOException {
+               int tag = source.readByte();
+               if (tag == TAG_REC_WITH_TIMESTAMP) {
+                       long timestamp = source.readLong();
+                       T value = typeSerializer.deserialize(source);
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+                       reuseRecord.replace(value, timestamp);
+                       return reuseRecord;
+               }
+               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                       T value = typeSerializer.deserialize(source);
+                       StreamRecord<T> reuseRecord = reuse.asRecord();
+                       reuseRecord.replace(value);
+                       return reuseRecord;
+               }
+               else if (tag == TAG_WATERMARK) {
+                       return new Watermark(source.readLong());
+               }
+               else if (tag == TAG_LATENCY_MARKER) {
+                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+               }
+               else {
+                       throw new IOException("Corrupt stream, found tag: " + 
tag);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof StreamElementSerializer) {
+                       StreamElementSerializer<?> other = 
(StreamElementSerializer<?>) obj;
+
+                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof StreamElementSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return typeSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index 71b43fe..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and 
only deals with
- * the element.
- *
- * <p>
- * {@link MultiplexingStreamRecordSerializer} is a version that deals with 
timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark 
Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-@Internal
-public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord<T>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<T> typeSerializer;
-       
-
-       public StreamRecordSerializer(TypeSerializer<T> serializer) {
-               if (serializer instanceof StreamRecordSerializer) {
-                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-               }
-               this.typeSerializer = Preconditions.checkNotNull(serializer);
-       }
-
-       public TypeSerializer<T> getContainedTypeSerializer() {
-               return this.typeSerializer;
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  General serializer and type utils
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public StreamRecordSerializer<T> duplicate() {
-               TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
-               return serializerCopy == typeSerializer ? this : new 
StreamRecordSerializer<T>(serializerCopy);
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public int getLength() {
-               return typeSerializer.getLength();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Type serialization, copying, instantiation
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public StreamRecord<T> createInstance() {
-               try {
-                       return new 
StreamRecord<T>(typeSerializer.createInstance());
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate 
StreamRecord.", e);
-               }
-       }
-       
-       @Override
-       public StreamRecord<T> copy(StreamRecord<T> from) {
-               return from.copy(typeSerializer.copy(from.getValue()));
-       }
-
-       @Override
-       public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> 
reuse) {
-               from.copyTo(typeSerializer.copy(from.getValue(), 
reuse.getValue()), reuse);
-               return reuse;
-       }
-
-       @Override
-       public void serialize(StreamRecord<T> value, DataOutputView target) 
throws IOException {
-               typeSerializer.serialize(value.getValue(), target);
-       }
-       
-       @Override
-       public StreamRecord<T> deserialize(DataInputView source) throws 
IOException {
-               return new StreamRecord<T>(typeSerializer.deserialize(source));
-       }
-
-       @Override
-       public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView 
source) throws IOException {
-               T element = typeSerializer.deserialize(reuse.getValue(), 
source);
-               reuse.replace(element);
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               typeSerializer.copy(source, target);
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof StreamRecordSerializer) {
-                       StreamRecordSerializer<?> other = 
(StreamRecordSerializer<?>) obj;
-
-                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof StreamRecordSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return typeSerializer.hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 97546b8..2e73e42 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -45,8 +45,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                        inputProcessor = new 
StreamInputProcessor<IN>(inputGates, inSerializer,
                                        this, 
                                        configuration.getCheckpointMode(),
-                                       getEnvironment().getIOManager(),
-                                       isSerializingMixedStream());
+                                       getEnvironment().getIOManager());
 
                        // make sure that stream tasks report their I/O 
statistics
                        AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7342b6d..d02b066 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -77,7 +77,6 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                
                final ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
                final StreamConfig configuration = 
containingTask.getConfiguration();
-               final boolean enableMultiplexing = 
containingTask.isSerializingMixedStream();
 
                headOperator = 
configuration.getStreamOperator(userCodeClassloader);
 
@@ -99,7 +98,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                                
                                RecordWriterOutput<?> streamOutput = 
createStreamOutput(
                                                outEdge, 
chainedConfigs.get(outEdge.getSourceId()), i,
-                                               
containingTask.getEnvironment(), enableMultiplexing, reporter, 
containingTask.getName());
+                                               
containingTask.getEnvironment(), reporter, containingTask.getName());
        
                                this.streamOutputs[i] = streamOutput;
                                streamOutputMap.put(outEdge, streamOutput);
@@ -305,7 +304,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
        
        private static <T> RecordWriterOutput<T> createStreamOutput(
                        StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
-                       Environment taskEnvironment, boolean enableMultiplexing,
+                       Environment taskEnvironment,
                        AccumulatorRegistry.Reporter reporter, String taskName)
        {
                TypeSerializer<T> outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
@@ -322,7 +321,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                output.setReporter(reporter);
                
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
                
-               return new RecordWriterOutput<>(output, outSerializer, 
enableMultiplexing);
+               return new RecordWriterOutput<>(output, outSerializer);
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index eb5fde7..77efc7b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -457,14 +457,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return tc == TimeCharacteristic.EventTime | tc == 
TimeCharacteristic.IngestionTime;
        }
 
-       /**
-        * Check if the tasks is sending a mixed stream (of watermarks, latency 
marks and records)
-        * @return true if stream contains more than just records
-        */
-       protected boolean isSerializingMixedStream() {
-               return isSerializingTimestamps() || 
getExecutionConfig().isLatencyTrackingEnabled();
-       }
-       
        // 
------------------------------------------------------------------------
        //  Access to properties and utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index bc80607..d695781 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -70,8 +70,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                inputDeserializer1, inputDeserializer2,
                                this,
                                configuration.getCheckpointMode(),
-                               getEnvironment().getIOManager(),
-                               isSerializingMixedStream());
+                               getEnvironment().getIOManager());
 
                // make sure that stream tasks report their I/O statistics
                AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 1187fe6..322a0f0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -86,7 +86,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        final int channelIndex = i;
                        final RecordSerializer<SerializationDelegate<Object>> 
recordSerializer = new 
SpanningRecordSerializer<SerializationDelegate<Object>>();
                        final SerializationDelegate<Object> delegate = 
(SerializationDelegate<Object>) (SerializationDelegate<?>)
-                                       new 
SerializationDelegate<StreamElement>(new 
MultiplexingStreamRecordSerializer<T>(serializer));
+                                       new 
SerializationDelegate<StreamElement>(new 
StreamElementSerializer<T>(serializer));
 
                        inputQueues[channelIndex] = new 
ConcurrentLinkedQueue<InputValue<Object>>();
                        inputChannels[channelIndex] = new 
TestInputChannel(inputGate, i);

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 8f3af15..2e3d090 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -39,8 +40,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -66,9 +67,12 @@ public class EvictingWindowOperatorTest {
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<Tuple2<String, 
Integer>>>) new StreamElementSerializer(inputType.createSerializer(new 
ExecutionConfig()));
 
-               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc = new ListStateDescriptor<>("window-contents",
-                       new 
StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
 
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
@@ -135,8 +139,12 @@ public class EvictingWindowOperatorTest {
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
 
-               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc = new ListStateDescriptor<>("window-contents",
-                       new 
StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<Tuple2<String, 
Integer>>>) new StreamElementSerializer(inputType.createSerializer(new 
ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
 
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
@@ -203,8 +211,12 @@ public class EvictingWindowOperatorTest {
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
-               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc = new ListStateDescriptor<>("window-contents",
-                       new 
StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<Tuple2<String, 
Integer>>>) new StreamElementSerializer(inputType.createSerializer(new 
ExecutionConfig()));
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
                        TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
deleted file mode 100644
index 1f0bf5a..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class MultiplexingStreamRecordSerializerTest {
-       
-       @Test
-       public void testDeepDuplication() {
-               @SuppressWarnings("unchecked")
-               TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) 
mock(TypeSerializer.class);
-               
-               @SuppressWarnings("unchecked")
-               TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) 
mock(TypeSerializer.class);
-               
-               when(serializer1.duplicate()).thenReturn(serializer2);
-               
-               MultiplexingStreamRecordSerializer<Long> streamRecSer = 
-                               new 
MultiplexingStreamRecordSerializer<Long>(serializer1);
-               
-               assertEquals(serializer1, 
streamRecSer.getContainedTypeSerializer());
-
-               MultiplexingStreamRecordSerializer<Long> copy = 
streamRecSer.duplicate();
-               assertNotEquals(copy, streamRecSer);
-               assertNotEquals(copy.getContainedTypeSerializer(), 
streamRecSer.getContainedTypeSerializer());
-       }
-
-       @Test
-       public void testBasicProperties() {
-               MultiplexingStreamRecordSerializer<Long> streamRecSer = 
-                               new 
MultiplexingStreamRecordSerializer<Long>(LongSerializer.INSTANCE);
-               
-               assertFalse(streamRecSer.isImmutableType());
-               assertEquals(Long.class, 
streamRecSer.createInstance().getValue().getClass());
-               assertEquals(-1L, streamRecSer.getLength());
-       }
-       
-       @Test
-       public void testSerialization() throws Exception {
-               final MultiplexingStreamRecordSerializer<String> serializer = 
-                               new 
MultiplexingStreamRecordSerializer<String>(StringSerializer.INSTANCE);
-
-               StreamRecord<String> withoutTimestamp = new 
StreamRecord<>("test 1 2 分享基督耶穌的愛給們,開拓雙贏!");
-               assertEquals(withoutTimestamp, 
serializeAndDeserialize(withoutTimestamp, serializer));
-
-               StreamRecord<String> withTimestamp = new StreamRecord<>("one 
more test 拓 們 分", 77L);
-               assertEquals(withTimestamp, 
serializeAndDeserialize(withTimestamp, serializer));
-
-               StreamRecord<String> negativeTimestamp = new 
StreamRecord<>("他", Long.MIN_VALUE);
-               assertEquals(negativeTimestamp, 
serializeAndDeserialize(negativeTimestamp, serializer));
-
-               Watermark positiveWatermark = new Watermark(13);
-               assertEquals(positiveWatermark, 
serializeAndDeserialize(positiveWatermark, serializer));
-
-               Watermark negativeWatermark = new 
Watermark(-4647654567676555876L);
-               assertEquals(negativeWatermark, 
serializeAndDeserialize(negativeWatermark, serializer));
-       }
-       
-       @SuppressWarnings("unchecked")
-       private static <T, X extends StreamElement> X serializeAndDeserialize(
-                       X record,
-                       MultiplexingStreamRecordSerializer<T> serializer) 
throws IOException {
-               
-               DataOutputSerializer output = new DataOutputSerializer(32);
-               serializer.serialize(record, output);
-               
-               // additional binary copy step
-               DataInputDeserializer copyInput = new 
DataInputDeserializer(output.getByteArray(), 0, output.length());
-               DataOutputSerializer copyOutput = new DataOutputSerializer(32);
-               serializer.copy(copyInput, copyOutput);
-               
-               DataInputDeserializer input = new 
DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length());
-               return (X) serializer.deserialize(input);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
new file mode 100644
index 0000000..0f42a65
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StreamElementSerializerTest {
+       
+       @Test
+       public void testDeepDuplication() {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) 
mock(TypeSerializer.class);
+               
+               @SuppressWarnings("unchecked")
+               TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) 
mock(TypeSerializer.class);
+               
+               when(serializer1.duplicate()).thenReturn(serializer2);
+               
+               StreamElementSerializer<Long> streamRecSer =
+                               new StreamElementSerializer<Long>(serializer1);
+               
+               assertEquals(serializer1, 
streamRecSer.getContainedTypeSerializer());
+
+               StreamElementSerializer<Long> copy = streamRecSer.duplicate();
+               assertNotEquals(copy, streamRecSer);
+               assertNotEquals(copy.getContainedTypeSerializer(), 
streamRecSer.getContainedTypeSerializer());
+       }
+
+       @Test
+       public void testBasicProperties() {
+               StreamElementSerializer<Long> streamRecSer =
+                               new 
StreamElementSerializer<Long>(LongSerializer.INSTANCE);
+               
+               assertFalse(streamRecSer.isImmutableType());
+               assertEquals(Long.class, 
streamRecSer.createInstance().getValue().getClass());
+               assertEquals(-1L, streamRecSer.getLength());
+       }
+       
+       @Test
+       public void testSerialization() throws Exception {
+               final StreamElementSerializer<String> serializer =
+                               new 
StreamElementSerializer<String>(StringSerializer.INSTANCE);
+
+               StreamRecord<String> withoutTimestamp = new 
StreamRecord<>("test 1 2 分享基督耶穌的愛給們,開拓雙贏!");
+               assertEquals(withoutTimestamp, 
serializeAndDeserialize(withoutTimestamp, serializer));
+
+               StreamRecord<String> withTimestamp = new StreamRecord<>("one 
more test 拓 們 分", 77L);
+               assertEquals(withTimestamp, 
serializeAndDeserialize(withTimestamp, serializer));
+
+               StreamRecord<String> negativeTimestamp = new 
StreamRecord<>("他", Long.MIN_VALUE);
+               assertEquals(negativeTimestamp, 
serializeAndDeserialize(negativeTimestamp, serializer));
+
+               Watermark positiveWatermark = new Watermark(13);
+               assertEquals(positiveWatermark, 
serializeAndDeserialize(positiveWatermark, serializer));
+
+               Watermark negativeWatermark = new 
Watermark(-4647654567676555876L);
+               assertEquals(negativeWatermark, 
serializeAndDeserialize(negativeWatermark, serializer));
+       }
+       
+       @SuppressWarnings("unchecked")
+       private static <T, X extends StreamElement> X serializeAndDeserialize(
+                       X record,
+                       StreamElementSerializer<T> serializer) throws 
IOException {
+               
+               DataOutputSerializer output = new DataOutputSerializer(32);
+               serializer.serialize(record, output);
+               
+               // additional binary copy step
+               DataInputDeserializer copyInput = new 
DataInputDeserializer(output.getByteArray(), 0, output.length());
+               DataOutputSerializer copyOutput = new DataOutputSerializer(32);
+               serializer.copy(copyInput, copyOutput);
+               
+               DataInputDeserializer input = new 
DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length());
+               return (X) serializer.deserialize(input);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
deleted file mode 100644
index bdeb552..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class StreamRecordSerializerTest {
-       
-       @Test
-       public void testDeepDuplication() {
-               try {
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<Long> serializer1 = 
(TypeSerializer<Long>) mock(TypeSerializer.class);
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<Long> serializer2 = 
(TypeSerializer<Long>) mock(TypeSerializer.class);
-                       
-                       when(serializer1.duplicate()).thenReturn(serializer2);
-                       
-                       StreamRecordSerializer<Long> streamRecSer = new 
StreamRecordSerializer<Long>(serializer1);
-                       assertEquals(serializer1, 
streamRecSer.getContainedTypeSerializer());
-                       
-                       StreamRecordSerializer<Long> copy = 
streamRecSer.duplicate();
-                       assertNotEquals(copy, streamRecSer);
-                       assertNotEquals(copy.getContainedTypeSerializer(), 
streamRecSer.getContainedTypeSerializer());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testBasicProperties() {
-               try {
-                       StreamRecordSerializer<Long> streamRecSer = new 
StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
-                       
-                       assertFalse(streamRecSer.isImmutableType());
-                       assertEquals(Long.class, 
streamRecSer.createInstance().getValue().getClass());
-                       assertEquals(LongSerializer.INSTANCE.getLength(), 
streamRecSer.getLength());
-                       
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testDeserializedValuesHaveNoTimestamps() throws Exception {
-               final StreamRecord<Long> original = new StreamRecord<>(42L);
-               
-               StreamRecordSerializer<Long> streamRecSer = new 
StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
-
-               DataOutputSerializer buffer = new DataOutputSerializer(16);
-               streamRecSer.serialize(original, buffer);
-               
-               DataInputDeserializer input = new 
DataInputDeserializer(buffer.getByteArray(), 0, buffer.length());
-               StreamRecord<Long> result = streamRecSer.deserialize(input);
-               
-               assertFalse(result.hasTimestamp());
-               assertEquals(original, result);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index cbb5a9d..ce62624 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.junit.Assert;
 
@@ -110,7 +110,7 @@ public class StreamTaskTestHarness<OUT> {
                
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
                outputSerializer = outputType.createSerializer(executionConfig);
-               outputStreamRecordSerializer = new 
MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
+               outputStreamRecordSerializer = new 
StreamElementSerializer<OUT>(outputSerializer);
        }
 
        public TimeServiceProvider getTimerService() {

Reply via email to