Repository: flink Updated Branches: refs/heads/master 7398fdbfe -> 71d2e3ef1
[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer This also renames MultiplexingStreamRecordSerializer to StreamElementSerializer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71d2e3ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71d2e3ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71d2e3ef Branch: refs/heads/master Commit: 71d2e3ef1e42174822709aa8217088f2a489975a Parents: 7398fdb Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 18 18:32:17 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 18:10:48 2016 +0200 ---------------------------------------------------------------------- .../flink/storm/wrappers/BoltWrapperTest.java | 4 +- .../operator/AbstractCEPPatternOperator.java | 6 +- .../AbstractKeyedCEPPatternOperator.java | 4 +- .../api/datastream/AllWindowedStream.java | 26 +- .../api/datastream/WindowedStream.java | 26 +- .../runtime/io/RecordWriterOutput.java | 15 +- .../runtime/io/StreamInputProcessor.java | 18 +- .../runtime/io/StreamTwoInputProcessor.java | 30 +-- .../MultiplexingStreamRecordSerializer.java | 252 ------------------ .../streamrecord/StreamElementSerializer.java | 254 +++++++++++++++++++ .../streamrecord/StreamRecordSerializer.java | 149 ----------- .../runtime/tasks/OneInputStreamTask.java | 3 +- .../streaming/runtime/tasks/OperatorChain.java | 7 +- .../streaming/runtime/tasks/StreamTask.java | 8 - .../runtime/tasks/TwoInputStreamTask.java | 3 +- .../consumer/StreamTestSingleInputGate.java | 4 +- .../windowing/EvictingWindowOperatorTest.java | 26 +- .../MultiplexingStreamRecordSerializerTest.java | 108 -------- .../StreamElementSerializerTest.java | 108 ++++++++ .../StreamRecordSerializerTest.java | 86 ------- .../runtime/tasks/StreamTaskTestHarness.java | 4 +- 21 files changed, 451 insertions(+), 690 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index e0659da..a7d60a6 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -43,8 +43,8 @@ import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Assert; import org.junit.Test; @@ -68,7 +68,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class}) +@PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, StreamRecord.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class BoltWrapperTest extends AbstractTest { http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index 10bb6ff..455e864 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -26,7 +26,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -46,7 +46,7 @@ import java.util.PriorityQueue; abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> { private static final long serialVersionUID = 7487334510746595640L; - private final MultiplexingStreamRecordSerializer<IN> streamRecordSerializer; + private final StreamElementSerializer<IN> streamRecordSerializer; // global nfa for all elements private NFA<IN> nfa; @@ -60,7 +60,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas NFACompiler.NFAFactory<IN> nfaFactory) { super(inputSerializer, isProcessingTime); - this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer); this.nfa = nfaFactory.createNFA(); } http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 07e2662..c3898c3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; @@ -108,7 +108,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst @SuppressWarnings("unchecked,rawtypes") TypeSerializer<StreamRecord<IN>> streamRecordSerializer = - (TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer()); + (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); if (priorityQueueOperatorState == null) { priorityQueueOperatorState = getPartitionedState( http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 6b09f3c..e77b5c8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -49,8 +50,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOper import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * A {@code AllWindowedStream} represents a data stream where the stream of @@ -269,8 +270,12 @@ public class AllWindowedStream<T, W extends Window> { WindowOperator<Byte, T, Iterable<T>, R, W> operator; if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -357,8 +362,12 @@ public class AllWindowedStream<T, W extends Window> { OneInputStreamOperator<T, R> operator; if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -450,9 +459,12 @@ public class AllWindowedStream<T, W extends Window> { OneInputStreamOperator<T, R> operator; if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index ae98619..15ec5f1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -56,8 +57,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOper import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * A {@code WindowedStream} represents a data stream where elements are grouped by @@ -290,8 +291,12 @@ public class WindowedStream<T, K, W extends Window> { WindowOperator<K, T, Iterable<T>, R, W> operator; if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -378,8 +383,12 @@ public class WindowedStream<T, K, W extends Window> { OneInputStreamOperator<T, R> operator; if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -471,9 +480,12 @@ public class WindowedStream<T, K, W extends Window> { OneInputStreamOperator<T, R> operator; if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 9f046f6..c3ef464 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -28,9 +28,8 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,8 +47,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { @SuppressWarnings("unchecked") public RecordWriterOutput( StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, - TypeSerializer<OUT> outSerializer, - boolean enableMultiplexing) { + TypeSerializer<OUT> outSerializer) { checkNotNull(recordWriter); @@ -58,13 +56,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>) (StreamRecordWriter<?>) recordWriter; - TypeSerializer<StreamElement> outRecordSerializer; - if (enableMultiplexing) { - outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer); - } else { - outRecordSerializer = (TypeSerializer<StreamElement>) - (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer); - } + TypeSerializer<StreamElement> outRecordSerializer = + new StreamElementSerializer<>(outSerializer); if (outSerializer != null) { serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 47e55dc..92b1556 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -42,9 +42,8 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. @@ -88,8 +87,7 @@ public class StreamInputProcessor<IN> { TypeSerializer<IN> inputSerializer, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager, - boolean enableMultiplexing) throws IOException { + IOManager ioManager) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -107,15 +105,9 @@ public class StreamInputProcessor<IN> { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer); - this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); - } else { - StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer); - this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>) - (NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser); - } - + StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer); + this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); + // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a25c1a1..660f07e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -39,8 +39,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import java.io.IOException; import java.util.Arrays; @@ -95,8 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { TypeSerializer<IN2> inputSerializer2, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager, - boolean enableMultiplexing) throws IOException { + IOManager ioManager) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -114,25 +112,11 @@ public class StreamTwoInputProcessor<IN1, IN2> { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer1); - this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser); - } - else { - StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<>(inputSerializer1); - this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>) - (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser); - } - - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer2); - this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser); - } - else { - StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<>(inputSerializer2); - this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>) - (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser); - } + StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1); + this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1); + + StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2); + this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2); // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java deleted file mode 100644 index 95e3ebd..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.streaming.api.watermark.Watermark; - -import java.io.IOException; - -import static java.util.Objects.requireNonNull; - -/** - * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal - * {@link TypeSerializer}, instead, this is only used at the stream task/operator level for - * transmitting StreamRecords and Watermarks. - * - * @param <T> The type of value in the StreamRecord - */ -@Internal -public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> { - - private static final long serialVersionUID = 1L; - - private static final int TAG_REC_WITH_TIMESTAMP = 0; - private static final int TAG_REC_WITHOUT_TIMESTAMP = 1; - private static final int TAG_WATERMARK = 2; - private static final int TAG_LATENCY_MARKER = 3; - - - private final TypeSerializer<T> typeSerializer; - - - public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) { - if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) { - throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); - } - this.typeSerializer = requireNonNull(serializer); - } - - public TypeSerializer<T> getContainedTypeSerializer() { - return this.typeSerializer; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public MultiplexingStreamRecordSerializer<T> duplicate() { - TypeSerializer<T> copy = typeSerializer.duplicate(); - return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public StreamRecord<T> createInstance() { - return new StreamRecord<T>(typeSerializer.createInstance()); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public StreamElement copy(StreamElement from) { - // we can reuse the timestamp since Instant is immutable - if (from.isRecord()) { - StreamRecord<T> fromRecord = from.asRecord(); - return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); - } - else if (from.isWatermark() || from.isLatencyMarker()) { - // is immutable - return from; - } - else { - throw new RuntimeException(); - } - } - - @Override - public StreamElement copy(StreamElement from, StreamElement reuse) { - if (from.isRecord() && reuse.isRecord()) { - StreamRecord<T> fromRecord = from.asRecord(); - StreamRecord<T> reuseRecord = reuse.asRecord(); - - T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()); - fromRecord.copyTo(valueCopy, reuseRecord); - return reuse; - } - else if (from.isWatermark() || from.isLatencyMarker()) { - // is immutable - return from; - } - else { - throw new RuntimeException("Cannot copy " + from + " -> " + reuse); - } - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int tag = source.readByte(); - target.write(tag); - - if (tag == TAG_REC_WITH_TIMESTAMP) { - // move timestamp - target.writeLong(source.readLong()); - typeSerializer.copy(source, target); - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - typeSerializer.copy(source, target); - } - else if (tag == TAG_WATERMARK) { - target.writeLong(source.readLong()); - } - else if (tag == TAG_LATENCY_MARKER) { - target.writeLong(source.readLong()); - target.writeInt(source.readInt()); - target.writeInt(source.readInt()); - } else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - @Override - public void serialize(StreamElement value, DataOutputView target) throws IOException { - if (value.isRecord()) { - StreamRecord<T> record = value.asRecord(); - - if (record.hasTimestamp()) { - target.write(TAG_REC_WITH_TIMESTAMP); - target.writeLong(record.getTimestamp()); - } else { - target.write(TAG_REC_WITHOUT_TIMESTAMP); - } - typeSerializer.serialize(record.getValue(), target); - } - else if (value.isWatermark()) { - target.write(TAG_WATERMARK); - target.writeLong(value.asWatermark().getTimestamp()); - } - else if (value.isLatencyMarker()) { - target.write(TAG_LATENCY_MARKER); - target.writeLong(value.asLatencyMarker().getMarkedTime()); - target.writeInt(value.asLatencyMarker().getVertexID()); - target.writeInt(value.asLatencyMarker().getSubtaskIndex()); - } - else { - throw new RuntimeException(); - } - } - - @Override - public StreamElement deserialize(DataInputView source) throws IOException { - int tag = source.readByte(); - if (tag == TAG_REC_WITH_TIMESTAMP) { - long timestamp = source.readLong(); - return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp); - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - return new StreamRecord<T>(typeSerializer.deserialize(source)); - } - else if (tag == TAG_WATERMARK) { - return new Watermark(source.readLong()); - } - else if (tag == TAG_LATENCY_MARKER) { - return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); - } - else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - @Override - public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException { - int tag = source.readByte(); - if (tag == TAG_REC_WITH_TIMESTAMP) { - long timestamp = source.readLong(); - T value = typeSerializer.deserialize(source); - StreamRecord<T> reuseRecord = reuse.asRecord(); - reuseRecord.replace(value, timestamp); - return reuseRecord; - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - T value = typeSerializer.deserialize(source); - StreamRecord<T> reuseRecord = reuse.asRecord(); - reuseRecord.replace(value); - return reuseRecord; - } - else if (tag == TAG_WATERMARK) { - return new Watermark(source.readLong()); - } - else if (tag == TAG_LATENCY_MARKER) { - return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); - } - else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - if (obj instanceof MultiplexingStreamRecordSerializer) { - MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj; - - return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof MultiplexingStreamRecordSerializer; - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java new file mode 100644 index 0000000..66d32da --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.watermark.Watermark; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +/** + * Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}. + * + * <p> + * This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the + * stream task/operator level for transmitting StreamRecords and Watermarks. + * + * @param <T> The type of value in the StreamRecord + */ +@Internal +public final class StreamElementSerializer<T> extends TypeSerializer<StreamElement> { + + private static final long serialVersionUID = 1L; + + private static final int TAG_REC_WITH_TIMESTAMP = 0; + private static final int TAG_REC_WITHOUT_TIMESTAMP = 1; + private static final int TAG_WATERMARK = 2; + private static final int TAG_LATENCY_MARKER = 3; + + + private final TypeSerializer<T> typeSerializer; + + + public StreamElementSerializer(TypeSerializer<T> serializer) { + if (serializer instanceof StreamElementSerializer) { + throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); + } + this.typeSerializer = requireNonNull(serializer); + } + + public TypeSerializer<T> getContainedTypeSerializer() { + return this.typeSerializer; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public StreamElementSerializer<T> duplicate() { + TypeSerializer<T> copy = typeSerializer.duplicate(); + return (copy == typeSerializer) ? this : new StreamElementSerializer<T>(copy); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public StreamRecord<T> createInstance() { + return new StreamRecord<T>(typeSerializer.createInstance()); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public StreamElement copy(StreamElement from) { + // we can reuse the timestamp since Instant is immutable + if (from.isRecord()) { + StreamRecord<T> fromRecord = from.asRecord(); + return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); + } + else if (from.isWatermark() || from.isLatencyMarker()) { + // is immutable + return from; + } + else { + throw new RuntimeException(); + } + } + + @Override + public StreamElement copy(StreamElement from, StreamElement reuse) { + if (from.isRecord() && reuse.isRecord()) { + StreamRecord<T> fromRecord = from.asRecord(); + StreamRecord<T> reuseRecord = reuse.asRecord(); + + T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()); + fromRecord.copyTo(valueCopy, reuseRecord); + return reuse; + } + else if (from.isWatermark() || from.isLatencyMarker()) { + // is immutable + return from; + } + else { + throw new RuntimeException("Cannot copy " + from + " -> " + reuse); + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int tag = source.readByte(); + target.write(tag); + + if (tag == TAG_REC_WITH_TIMESTAMP) { + // move timestamp + target.writeLong(source.readLong()); + typeSerializer.copy(source, target); + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + typeSerializer.copy(source, target); + } + else if (tag == TAG_WATERMARK) { + target.writeLong(source.readLong()); + } + else if (tag == TAG_LATENCY_MARKER) { + target.writeLong(source.readLong()); + target.writeInt(source.readInt()); + target.writeInt(source.readInt()); + } else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + @Override + public void serialize(StreamElement value, DataOutputView target) throws IOException { + if (value.isRecord()) { + StreamRecord<T> record = value.asRecord(); + + if (record.hasTimestamp()) { + target.write(TAG_REC_WITH_TIMESTAMP); + target.writeLong(record.getTimestamp()); + } else { + target.write(TAG_REC_WITHOUT_TIMESTAMP); + } + typeSerializer.serialize(record.getValue(), target); + } + else if (value.isWatermark()) { + target.write(TAG_WATERMARK); + target.writeLong(value.asWatermark().getTimestamp()); + } + else if (value.isLatencyMarker()) { + target.write(TAG_LATENCY_MARKER); + target.writeLong(value.asLatencyMarker().getMarkedTime()); + target.writeInt(value.asLatencyMarker().getVertexID()); + target.writeInt(value.asLatencyMarker().getSubtaskIndex()); + } + else { + throw new RuntimeException(); + } + } + + @Override + public StreamElement deserialize(DataInputView source) throws IOException { + int tag = source.readByte(); + if (tag == TAG_REC_WITH_TIMESTAMP) { + long timestamp = source.readLong(); + return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp); + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + return new StreamRecord<T>(typeSerializer.deserialize(source)); + } + else if (tag == TAG_WATERMARK) { + return new Watermark(source.readLong()); + } + else if (tag == TAG_LATENCY_MARKER) { + return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + } + else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + @Override + public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException { + int tag = source.readByte(); + if (tag == TAG_REC_WITH_TIMESTAMP) { + long timestamp = source.readLong(); + T value = typeSerializer.deserialize(source); + StreamRecord<T> reuseRecord = reuse.asRecord(); + reuseRecord.replace(value, timestamp); + return reuseRecord; + } + else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { + T value = typeSerializer.deserialize(source); + StreamRecord<T> reuseRecord = reuse.asRecord(); + reuseRecord.replace(value); + return reuseRecord; + } + else if (tag == TAG_WATERMARK) { + return new Watermark(source.readLong()); + } + else if (tag == TAG_LATENCY_MARKER) { + return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + } + else { + throw new IOException("Corrupt stream, found tag: " + tag); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object obj) { + if (obj instanceof StreamElementSerializer) { + StreamElementSerializer<?> other = (StreamElementSerializer<?>) obj; + + return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof StreamElementSerializer; + } + + @Override + public int hashCode() { + return typeSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java deleted file mode 100644 index 71b43fe..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import java.io.IOException; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; - -/** - * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with - * the element. - * - * <p> - * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also - * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same - * stream with {@link StreamRecord StreamRecords}. - * - * @see MultiplexingStreamRecordSerializer - * - * @param <T> The type of value in the {@link StreamRecord} - */ -@Internal -public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer<T> typeSerializer; - - - public StreamRecordSerializer(TypeSerializer<T> serializer) { - if (serializer instanceof StreamRecordSerializer) { - throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); - } - this.typeSerializer = Preconditions.checkNotNull(serializer); - } - - public TypeSerializer<T> getContainedTypeSerializer() { - return this.typeSerializer; - } - - // ------------------------------------------------------------------------ - // General serializer and type utils - // ------------------------------------------------------------------------ - - @Override - public StreamRecordSerializer<T> duplicate() { - TypeSerializer<T> serializerCopy = typeSerializer.duplicate(); - return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public int getLength() { - return typeSerializer.getLength(); - } - - // ------------------------------------------------------------------------ - // Type serialization, copying, instantiation - // ------------------------------------------------------------------------ - - @Override - public StreamRecord<T> createInstance() { - try { - return new StreamRecord<T>(typeSerializer.createInstance()); - } catch (Exception e) { - throw new RuntimeException("Cannot instantiate StreamRecord.", e); - } - } - - @Override - public StreamRecord<T> copy(StreamRecord<T> from) { - return from.copy(typeSerializer.copy(from.getValue())); - } - - @Override - public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) { - from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse); - return reuse; - } - - @Override - public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException { - typeSerializer.serialize(value.getValue(), target); - } - - @Override - public StreamRecord<T> deserialize(DataInputView source) throws IOException { - return new StreamRecord<T>(typeSerializer.deserialize(source)); - } - - @Override - public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException { - T element = typeSerializer.deserialize(reuse.getValue(), source); - reuse.replace(element); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - typeSerializer.copy(source, target); - } - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - if (obj instanceof StreamRecordSerializer) { - StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj; - - return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof StreamRecordSerializer; - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 97546b8..2e73e42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -45,8 +45,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - isSerializingMixedStream()); + getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7342b6d..d02b066 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -77,7 +77,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); - final boolean enableMultiplexing = containingTask.isSerializingMixedStream(); headOperator = configuration.getStreamOperator(userCodeClassloader); @@ -99,7 +98,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { RecordWriterOutput<?> streamOutput = createStreamOutput( outEdge, chainedConfigs.get(outEdge.getSourceId()), i, - containingTask.getEnvironment(), enableMultiplexing, reporter, containingTask.getName()); + containingTask.getEnvironment(), reporter, containingTask.getName()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); @@ -305,7 +304,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { private static <T> RecordWriterOutput<T> createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, - Environment taskEnvironment, boolean enableMultiplexing, + Environment taskEnvironment, AccumulatorRegistry.Reporter reporter, String taskName) { TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); @@ -322,7 +321,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { output.setReporter(reporter); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup()); - return new RecordWriterOutput<>(output, outSerializer, enableMultiplexing); + return new RecordWriterOutput<>(output, outSerializer); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index eb5fde7..77efc7b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -457,14 +457,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime; } - /** - * Check if the tasks is sending a mixed stream (of watermarks, latency marks and records) - * @return true if stream contains more than just records - */ - protected boolean isSerializingMixedStream() { - return isSerializingTimestamps() || getExecutionConfig().isLatencyTrackingEnabled(); - } - // ------------------------------------------------------------------------ // Access to properties and utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index bc80607..d695781 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -70,8 +70,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS inputDeserializer1, inputDeserializer2, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - isSerializingMixedStream()); + getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 1187fe6..322a0f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -86,7 +86,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { final int channelIndex = i; final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>(); final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>) - new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer)); + new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer)); inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>(); inputChannels[channelIndex] = new TestInputChannel(inputGate, i); http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 8f3af15..2e3d090 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -39,8 +40,8 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -66,9 +67,12 @@ public class EvictingWindowOperatorTest { TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); - ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( @@ -135,8 +139,12 @@ public class EvictingWindowOperatorTest { TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( @@ -203,8 +211,12 @@ public class EvictingWindowOperatorTest { TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>( TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java deleted file mode 100644 index 1f0bf5a..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; -import org.apache.flink.streaming.api.watermark.Watermark; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class MultiplexingStreamRecordSerializerTest { - - @Test - public void testDeepDuplication() { - @SuppressWarnings("unchecked") - TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class); - - @SuppressWarnings("unchecked") - TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class); - - when(serializer1.duplicate()).thenReturn(serializer2); - - MultiplexingStreamRecordSerializer<Long> streamRecSer = - new MultiplexingStreamRecordSerializer<Long>(serializer1); - - assertEquals(serializer1, streamRecSer.getContainedTypeSerializer()); - - MultiplexingStreamRecordSerializer<Long> copy = streamRecSer.duplicate(); - assertNotEquals(copy, streamRecSer); - assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer()); - } - - @Test - public void testBasicProperties() { - MultiplexingStreamRecordSerializer<Long> streamRecSer = - new MultiplexingStreamRecordSerializer<Long>(LongSerializer.INSTANCE); - - assertFalse(streamRecSer.isImmutableType()); - assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass()); - assertEquals(-1L, streamRecSer.getLength()); - } - - @Test - public void testSerialization() throws Exception { - final MultiplexingStreamRecordSerializer<String> serializer = - new MultiplexingStreamRecordSerializer<String>(StringSerializer.INSTANCE); - - StreamRecord<String> withoutTimestamp = new StreamRecord<>("test 1 2 å享åºç£è¶ç©çæçµ¦åï¼éæéè´!"); - assertEquals(withoutTimestamp, serializeAndDeserialize(withoutTimestamp, serializer)); - - StreamRecord<String> withTimestamp = new StreamRecord<>("one more test æ å å", 77L); - assertEquals(withTimestamp, serializeAndDeserialize(withTimestamp, serializer)); - - StreamRecord<String> negativeTimestamp = new StreamRecord<>("ä»", Long.MIN_VALUE); - assertEquals(negativeTimestamp, serializeAndDeserialize(negativeTimestamp, serializer)); - - Watermark positiveWatermark = new Watermark(13); - assertEquals(positiveWatermark, serializeAndDeserialize(positiveWatermark, serializer)); - - Watermark negativeWatermark = new Watermark(-4647654567676555876L); - assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer)); - } - - @SuppressWarnings("unchecked") - private static <T, X extends StreamElement> X serializeAndDeserialize( - X record, - MultiplexingStreamRecordSerializer<T> serializer) throws IOException { - - DataOutputSerializer output = new DataOutputSerializer(32); - serializer.serialize(record, output); - - // additional binary copy step - DataInputDeserializer copyInput = new DataInputDeserializer(output.getByteArray(), 0, output.length()); - DataOutputSerializer copyOutput = new DataOutputSerializer(32); - serializer.copy(copyInput, copyOutput); - - DataInputDeserializer input = new DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length()); - return (X) serializer.deserialize(input); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java new file mode 100644 index 0000000..0f42a65 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StreamElementSerializerTest { + + @Test + public void testDeepDuplication() { + @SuppressWarnings("unchecked") + TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class); + + @SuppressWarnings("unchecked") + TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class); + + when(serializer1.duplicate()).thenReturn(serializer2); + + StreamElementSerializer<Long> streamRecSer = + new StreamElementSerializer<Long>(serializer1); + + assertEquals(serializer1, streamRecSer.getContainedTypeSerializer()); + + StreamElementSerializer<Long> copy = streamRecSer.duplicate(); + assertNotEquals(copy, streamRecSer); + assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer()); + } + + @Test + public void testBasicProperties() { + StreamElementSerializer<Long> streamRecSer = + new StreamElementSerializer<Long>(LongSerializer.INSTANCE); + + assertFalse(streamRecSer.isImmutableType()); + assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass()); + assertEquals(-1L, streamRecSer.getLength()); + } + + @Test + public void testSerialization() throws Exception { + final StreamElementSerializer<String> serializer = + new StreamElementSerializer<String>(StringSerializer.INSTANCE); + + StreamRecord<String> withoutTimestamp = new StreamRecord<>("test 1 2 å享åºç£è¶ç©çæçµ¦åï¼éæéè´!"); + assertEquals(withoutTimestamp, serializeAndDeserialize(withoutTimestamp, serializer)); + + StreamRecord<String> withTimestamp = new StreamRecord<>("one more test æ å å", 77L); + assertEquals(withTimestamp, serializeAndDeserialize(withTimestamp, serializer)); + + StreamRecord<String> negativeTimestamp = new StreamRecord<>("ä»", Long.MIN_VALUE); + assertEquals(negativeTimestamp, serializeAndDeserialize(negativeTimestamp, serializer)); + + Watermark positiveWatermark = new Watermark(13); + assertEquals(positiveWatermark, serializeAndDeserialize(positiveWatermark, serializer)); + + Watermark negativeWatermark = new Watermark(-4647654567676555876L); + assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer)); + } + + @SuppressWarnings("unchecked") + private static <T, X extends StreamElement> X serializeAndDeserialize( + X record, + StreamElementSerializer<T> serializer) throws IOException { + + DataOutputSerializer output = new DataOutputSerializer(32); + serializer.serialize(record, output); + + // additional binary copy step + DataInputDeserializer copyInput = new DataInputDeserializer(output.getByteArray(), 0, output.length()); + DataOutputSerializer copyOutput = new DataOutputSerializer(32); + serializer.copy(copyInput, copyOutput); + + DataInputDeserializer input = new DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length()); + return (X) serializer.deserialize(input); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java deleted file mode 100644 index bdeb552..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; -import org.junit.Test; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class StreamRecordSerializerTest { - - @Test - public void testDeepDuplication() { - try { - @SuppressWarnings("unchecked") - TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class); - @SuppressWarnings("unchecked") - TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class); - - when(serializer1.duplicate()).thenReturn(serializer2); - - StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(serializer1); - assertEquals(serializer1, streamRecSer.getContainedTypeSerializer()); - - StreamRecordSerializer<Long> copy = streamRecSer.duplicate(); - assertNotEquals(copy, streamRecSer); - assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testBasicProperties() { - try { - StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE); - - assertFalse(streamRecSer.isImmutableType()); - assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass()); - assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength()); - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testDeserializedValuesHaveNoTimestamps() throws Exception { - final StreamRecord<Long> original = new StreamRecord<>(42L); - - StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE); - - DataOutputSerializer buffer = new DataOutputSerializer(16); - streamRecSer.serialize(original, buffer); - - DataInputDeserializer input = new DataInputDeserializer(buffer.getByteArray(), 0, buffer.length()); - StreamRecord<Long> result = streamRecSer.deserialize(input); - - assertFalse(result.hasTimestamp()); - assertEquals(original, result); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/71d2e3ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index cbb5a9d..ce62624 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.junit.Assert; @@ -110,7 +110,7 @@ public class StreamTaskTestHarness<OUT> { streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); outputSerializer = outputType.createSerializer(executionConfig); - outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer); + outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer); } public TimeServiceProvider getTimerService() {
