TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object
reuse
URL: https://github.com/apache/flink/pull/6643
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/execution_configuration.md
b/docs/dev/execution_configuration.md
index f0103b0f39f..3991ab59ac5 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an
anonymous user functi
- `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by
default. Forces the Flink AvroTypeInformation to use the Avro serializer
instead of Kryo for serializing Avro POJOs.
-- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are
not reused in Flink. Enabling the object reuse mode will instruct the runtime
to reuse user objects for better performance. Keep in mind that this can lead
to bugs when the user-code function of an operation is not aware of this
behavior.
+- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are
not reused in Flink. Enabling the object reuse mode will instruct the runtime
to reuse user objects for better performance. Keep in mind that this can lead
to bugs when the user-defined function of an operation is not aware of this
behavior.
- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status
updates are printed to `System.out` by default. This setting allows to disable
this behavior.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..d36fd296562 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() {
/**
* Enables reusing objects that Flink internally uses for
deserialization and passing
- * data to user-code functions. Keep in mind that this can lead to bugs
when the
- * user-code function of an operation is not aware of this behaviour.
+ * data to user-defined functions. Keep in mind that this can lead to
bugs when the
+ * user-defined function of an operation is not aware of this behaviour.
*/
public ExecutionConfig enableObjectReuse() {
objectReuse = true;
@@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() {
/**
* Disables reusing objects that Flink internally uses for
deserialization and passing
- * data to user-code functions. @see #enableObjectReuse()
+ * data to user-defined functions. @see #enableObjectReuse()
*/
public ExecutionConfig disableObjectReuse() {
objectReuse = false;
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d8ba29ae478..f4185f4c129 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -51,7 +51,7 @@ class TableSinkITCase(
val input = CollectionDataSets.get3TupleDataSet(env)
.map(x => x).setParallelism(4) // increase DOP to 4
- val results = input.toTable(tEnv, 'a, 'b, 'c)
+ input.toTable(tEnv, 'a, 'b, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path, fieldDelim = "|"))
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 70e59f3d24d..95cb1df1b04 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table
import java.io.File
import java.lang.{Boolean => JBool}
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
@@ -54,7 +56,7 @@ class TableSinkITCase extends AbstractTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSourceSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val input = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(r => r._2)
@@ -560,8 +562,7 @@ private[flink] class TestAppendSink extends
AppendStreamTableSink[Row] {
var fTypes: Array[TypeInformation[_]] = _
override def emitDataStream(s: DataStream[Row]): Unit = {
- s.map(
- new MapFunction[Row, JTuple2[JBool, Row]] {
+ s.map(new MapFunction[Row, JTuple2[JBool, Row]] {
override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true,
value)
})
.addSink(new RowSink)
@@ -661,12 +662,11 @@ object RowCollector {
new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
def addValue(value: JTuple2[JBool, Row]): Unit = {
+ // make a deep copy
+ val copy = new JTuple2[JBool, Row](value.f0,
+ new KryoSerializer(classOf[Row], new ExecutionConfig()).copy(value.f1))
- // make a copy
- val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
- sink.synchronized {
- sink += copy
- }
+ sink.synchronized { sink += copy }
}
def getAndClearValues: List[JTuple2[JBool, Row]] = {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index a9c64b5f6fe..769605ba704 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -36,6 +36,7 @@
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -109,6 +110,8 @@
private boolean isFinished;
+ private IN reusedObject;
+
@SuppressWarnings("unchecked")
public StreamInputProcessor(
InputGate[] inputGates,
@@ -121,7 +124,8 @@ public StreamInputProcessor(
StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
- WatermarkGauge watermarkGauge) throws IOException {
+ WatermarkGauge watermarkGauge,
+ boolean objectReuse) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
@@ -131,7 +135,10 @@ public StreamInputProcessor(
this.lock = checkNotNull(lock);
StreamElementSerializer<IN> ser = new
StreamElementSerializer<>(inputSerializer);
- this.deserializationDelegate = new
NonReusingDeserializationDelegate<>(ser);
+ this.deserializationDelegate = objectReuse ?
+ new ReusingDeserializationDelegate<>(ser) :
+ new NonReusingDeserializationDelegate<>(ser);
+ this.reusedObject = objectReuse ?
inputSerializer.createInstance() : null;
// Initialize one deserializer per input channel
this.recordDeserializers = new
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
@@ -169,6 +176,10 @@ public boolean processInput() throws Exception {
while (true) {
if (currentRecordDeserializer != null) {
+ if (reusedObject != null){
+ deserializationDelegate.setInstance(new
StreamRecord<>(reusedObject));
+ }
+
DeserializationResult result =
currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
@@ -194,6 +205,8 @@ public boolean processInput() throws Exception {
}
continue;
} else {
+ reusedObject =
((StreamRecord<IN>) recordOrMark).getValue();
+
// now we can do the actual
processing
StreamRecord<IN> record =
recordOrMark.asRecord();
synchronized (lock) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ab4f90dcf23..531d6cbf28c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -36,6 +36,7 @@
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -125,6 +126,9 @@
private boolean isFinished;
+ private IN1 reusedObject1;
+ private IN2 reusedObject2;
+
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
@@ -140,7 +144,8 @@ public StreamTwoInputProcessor(
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
- WatermarkGauge input2WatermarkGauge) throws IOException
{
+ WatermarkGauge input2WatermarkGauge,
+ boolean objectReuse) throws IOException {
final InputGate inputGate =
InputGateUtil.createInputGate(inputGates1, inputGates2);
@@ -150,10 +155,17 @@ public StreamTwoInputProcessor(
this.lock = checkNotNull(lock);
StreamElementSerializer<IN1> ser1 = new
StreamElementSerializer<>(inputSerializer1);
- this.deserializationDelegate1 = new
NonReusingDeserializationDelegate<>(ser1);
+ this.deserializationDelegate1 = objectReuse ?
+ new ReusingDeserializationDelegate<>(ser1) :
+ new NonReusingDeserializationDelegate<>(ser1);
+ this.reusedObject1 = objectReuse ?
inputSerializer1.createInstance() : null;
StreamElementSerializer<IN2> ser2 = new
StreamElementSerializer<>(inputSerializer2);
- this.deserializationDelegate2 = new
NonReusingDeserializationDelegate<>(ser2);
+ this.deserializationDelegate2 = objectReuse ?
+ new ReusingDeserializationDelegate<>(ser2) :
+ new NonReusingDeserializationDelegate<>(ser2);
+
+ this.reusedObject2 = objectReuse ?
inputSerializer2.createInstance() : null;
// Initialize one deserializer per input channel
this.recordDeserializers = new
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
@@ -203,8 +215,14 @@ public boolean processInput() throws Exception {
if (currentRecordDeserializer != null) {
DeserializationResult result;
if (currentChannel < numInputChannels1) {
+ if (reusedObject1 != null){
+
deserializationDelegate1.setInstance(new StreamRecord<>(reusedObject1));
+ }
result =
currentRecordDeserializer.getNextRecord(deserializationDelegate1);
} else {
+ if (reusedObject2 != null){
+
deserializationDelegate2.setInstance(new StreamRecord<>(reusedObject2));
+ }
result =
currentRecordDeserializer.getNextRecord(deserializationDelegate2);
}
@@ -231,6 +249,8 @@ else if (recordOrWatermark.isLatencyMarker()) {
continue;
}
else {
+ reusedObject1 =
((StreamRecord<IN1>) recordOrWatermark).getValue();
+
StreamRecord<IN1>
record = recordOrWatermark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
@@ -258,6 +278,8 @@ else if (recordOrWatermark.isLatencyMarker()) {
continue;
}
else {
+ reusedObject2 =
((StreamRecord<IN2>) recordOrWatermark).getValue();
+
StreamRecord<IN2>
record = recordOrWatermark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ed6022ff592..8503eb21d70 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -225,15 +225,15 @@ else if (tag == TAG_LATENCY_MARKER) {
public StreamElement deserialize(StreamElement reuse, DataInputView
source) throws IOException {
int tag = source.readByte();
if (tag == TAG_REC_WITH_TIMESTAMP) {
- long timestamp = source.readLong();
- T value = typeSerializer.deserialize(source);
StreamRecord<T> reuseRecord = reuse.asRecord();
+ long timestamp = source.readLong();
+ T value =
typeSerializer.deserialize(reuseRecord.getValue(), source);
reuseRecord.replace(value, timestamp);
return reuseRecord;
}
else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
- T value = typeSerializer.deserialize(source);
StreamRecord<T> reuseRecord = reuse.asRecord();
+ T value =
typeSerializer.deserialize(reuseRecord.getValue(), source);
reuseRecord.replace(value);
return reuseRecord;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 74985186836..91671be0a5c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -90,7 +90,8 @@ public void init() throws Exception {
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
- inputWatermarkGauge);
+ inputWatermarkGauge,
+
getExecutionConfig().isObjectReuseEnabled());
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK,
this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 546ccdb3bfc..1199251dd44 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -100,7 +100,8 @@ public void init() throws Exception {
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
input1WatermarkGauge,
- input2WatermarkGauge);
+ input2WatermarkGauge,
+ getExecutionConfig().isObjectReuseEnabled());
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK,
minInputWatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK,
input1WatermarkGauge);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78ed1a..94fdd0fc1b5 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -24,9 +24,13 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Counter;
@@ -79,6 +83,7 @@
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -108,20 +113,20 @@ public void testOpenCloseAndTimestamps() throws Exception
{
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String,
String>(new TestOpenCloseMapFunction());
+ StreamMap<String, String> mapOperator = new StreamMap<>(new
TestOpenCloseMapFunction());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());
long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processElement(new StreamRecord<String>("Hello",
initialTime + 1));
- testHarness.processElement(new StreamRecord<String>("Ciao",
initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("Hello",
initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("Ciao", initialTime
+ 2));
+ testHarness.processElement(new StreamRecord<>("Hello",
initialTime + 1));
+ testHarness.processElement(new StreamRecord<>("Ciao",
initialTime + 2));
+ expectedOutput.add(new StreamRecord<>("Hello", initialTime +
1));
+ expectedOutput.add(new StreamRecord<>("Ciao", initialTime + 2));
testHarness.endInput();
@@ -152,11 +157,11 @@ public void testWatermarkAndStreamStatusForwarding()
throws Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String,
String>(new IdentityMap());
+ StreamMap<String, String> mapOperator = new StreamMap<>(new
IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -180,10 +185,10 @@ public void testWatermarkAndStreamStatusForwarding()
throws Exception {
testHarness.getOutput());
// contrary to checkpoint barriers these elements are not
blocked by watermarks
- testHarness.processElement(new StreamRecord<String>("Hello",
initialTime));
- testHarness.processElement(new StreamRecord<String>("Ciao",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Hello",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao",
initialTime));
+ testHarness.processElement(new StreamRecord<>("Hello",
initialTime));
+ testHarness.processElement(new StreamRecord<>("Ciao",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Hello", initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao", initialTime));
testHarness.processElement(new Watermark(initialTime + 4), 0,
0);
testHarness.processElement(new Watermark(initialTime + 3), 0,
1);
@@ -274,7 +279,7 @@ public void testWatermarksNotForwardedWithinChainWhenIdle()
throws Exception {
// --------------------- begin test ---------------------
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
testHarness.invoke();
testHarness.waitForTaskRunning();
@@ -364,11 +369,11 @@ public void testCheckpointBarriers() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String,
String>(new IdentityMap());
+ StreamMap<String, String> mapOperator = new StreamMap<>(new
IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -378,16 +383,16 @@ public void testCheckpointBarriers() throws Exception {
// These elements should be buffered until we receive barriers
from
// all inputs
- testHarness.processElement(new
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Hello-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Ciao-0-0",
initialTime), 0, 0);
// These elements should be forwarded, since we did not yet
receive a checkpoint barrier
// on that input, only add to same input, otherwise we would
not know the ordering
// of the output since the Task might read the inputs in any
order
- testHarness.processElement(new
StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<String>("Ciao-1-1",
initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello-1-1",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-1-1",
initialTime));
+ testHarness.processElement(new StreamRecord<>("Hello-1-1",
initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<>("Ciao-1-1",
initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<>("Hello-1-1",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));
testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements
from non-blocked input
@@ -401,8 +406,8 @@ public void testCheckpointBarriers() throws Exception {
// now we should see the barrier and after that the buffered
elements
expectedOutput.add(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Hello-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
testHarness.endInput();
@@ -427,11 +432,11 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String,
String>(new IdentityMap());
+ StreamMap<String, String> mapOperator = new StreamMap<>(new
IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -441,16 +446,16 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
// These elements should be buffered until we receive barriers
from
// all inputs
- testHarness.processElement(new
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Hello-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Ciao-0-0",
initialTime), 0, 0);
// These elements should be forwarded, since we did not yet
receive a checkpoint barrier
// on that input, only add to same input, otherwise we would
not know the ordering
// of the output since the Task might read the inputs in any
order
- testHarness.processElement(new
StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<String>("Ciao-1-1",
initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello-1-1",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-1-1",
initialTime));
+ testHarness.processElement(new StreamRecord<>("Hello-1-1",
initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<>("Ciao-1-1",
initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<>("Hello-1-1",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));
testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements
from non-blocked input
@@ -464,8 +469,8 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
testHarness.processEvent(new CheckpointBarrier(1, 1,
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Hello-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1,
CheckpointOptions.forCheckpointWithDefaultLocation()));
testHarness.waitForInputProcessing();
@@ -759,6 +764,95 @@ public void processWatermark(Watermark mark) throws
Exception {
}
}
+ @Test
+ public void testMutableObjectReuse() throws Exception {
+ final OneInputStreamTaskTestHarness<String, String> testHarness
= new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO));
+
+ testHarness.setupOperatorChain(new OperatorID(), new
TestMutableObjectReuseOperator(true))
+ .chain(new OperatorID(), new
TestMutableObjectReuseOperator(),
+ new TupleSerializer(Tuple2.class,
+ new TypeSerializer<?>[]{
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())}))
+ .finish();
+
+ ExecutionConfig executionConfig =
testHarness.getExecutionConfig();
+ executionConfig.enableObjectReuse();
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ testHarness.processElement(new
StreamRecord<>(Tuple2.of("Hello", 1)));
+ testHarness.processElement(new
StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1));
+ testHarness.processElement(new Watermark(initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao",
1), initialTime + 2));
+ testHarness.processEvent(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()));
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao",
2), initialTime + 3));
+
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1)));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2),
initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1),
initialTime + 2));
+ expectedOutput.add(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2),
initialTime + 3));
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+
+ // This must only be used in one test, otherwise the static fields will
be changed
+ // by several tests concurrently
+ private static class TestMutableObjectReuseOperator
+ extends AbstractStreamOperator<Tuple2<String, Integer>>
+ implements OneInputStreamOperator<Tuple2<String, Integer>,
Tuple2<String, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isHeadOperator;
+ private static Object headOperatorValue;
+
+ private Object prevRecord = null;
+ private Object prevValue = null;
+
+ public TestMutableObjectReuseOperator() {
+ this.isHeadOperator = false;
+ }
+
+ public TestMutableObjectReuseOperator(boolean isHeadOperator) {
+ this.isHeadOperator = isHeadOperator;
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<String,
Integer>> element) throws Exception {
+ if (isHeadOperator) {
+ if (prevRecord != null) {
+ assertNotEquals("Reuse StreamRecord
object in the head operator.", prevRecord, element);
+ assertEquals("No reuse value object in
the head operator.", prevValue, element.getValue());
+ }
+
+ prevRecord = element;
+ prevValue = element.getValue();
+
+ headOperatorValue = element.getValue();
+ } else {
+ assertEquals("No reuse value object in chain.",
headOperatorValue, element.getValue());
+ }
+
+ output.collect(element);
+ }
+ }
+
//==============================================================================================
// Utility functions and classes
//==============================================================================================
@@ -807,7 +901,7 @@ private void configureChainedTestingStreamOperator(
null
),
0,
- Collections.<String>emptyList(),
+ Collections.emptyList(),
null,
null
);
@@ -973,12 +1067,16 @@ protected void handleWatermark(Watermark mark) {
public void processElement(StreamRecord<String> element) throws
Exception {
output.collect(element);
- if
(element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
- this.expectForwardedWatermarks = true;
- } else if
(element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
- this.expectForwardedWatermarks = false;
- } else {
- handleElement(element);
+ switch (element.getValue()) {
+ case EXPECT_FORWARDED_WATERMARKS_MARKER:
+ this.expectForwardedWatermarks = true;
+ break;
+ case NO_FORWARDED_WATERMARKS_MARKER:
+ this.expectForwardedWatermarks = false;
+ break;
+ default:
+ handleElement(element);
+ break;
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d151573582..e80fdc17154 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,10 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -40,6 +44,7 @@
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -56,6 +61,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests for {@link
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
@@ -83,31 +90,31 @@ public void testOpenCloseAndTimestamps() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
+ CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<>(new TestOpenCloseMapFunction());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());
long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processElement(new StreamRecord<String>("Hello",
initialTime + 1), 0, 0);
- expectedOutput.add(new StreamRecord<String>("Hello",
initialTime + 1));
+ testHarness.processElement(new StreamRecord<>("Hello",
initialTime + 1), 0, 0);
+ expectedOutput.add(new StreamRecord<>("Hello", initialTime +
1));
// wait until the input is processed to ensure ordering of the
output
testHarness.waitForInputProcessing();
- testHarness.processElement(new StreamRecord<Integer>(1337,
initialTime + 2), 1, 0);
+ testHarness.processElement(new StreamRecord<>(1337, initialTime
+ 2), 1, 0);
- expectedOutput.add(new StreamRecord<String>("1337", initialTime
+ 2));
+ expectedOutput.add(new StreamRecord<>("1337", initialTime + 2));
testHarness.endInput();
testHarness.waitForTaskCompletion();
- Assert.assertTrue("RichFunction methods where not called.",
TestOpenCloseMapFunction.closeCalled);
+ assertTrue("RichFunction methods where not called.",
TestOpenCloseMapFunction.closeCalled);
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput, testHarness.getOutput());
}
@@ -122,7 +129,7 @@ public void testOpenCloseAndTimestamps() throws Exception {
public void testWatermarkAndStreamStatusForwarding() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String>
testHarness =
- new TwoInputStreamTaskTestHarness<String, Integer,
String>(
+ new TwoInputStreamTaskTestHarness<>(
TwoInputStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO,
@@ -131,11 +138,11 @@ public void testWatermarkAndStreamStatusForwarding()
throws Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<String, Integer, String>(new IdentityMap());
+ CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -158,10 +165,10 @@ public void testWatermarkAndStreamStatusForwarding()
throws Exception {
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput, testHarness.getOutput());
// contrary to checkpoint barriers these elements are not
blocked by watermarks
- testHarness.processElement(new StreamRecord<String>("Hello",
initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<Integer>(42,
initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello",
initialTime));
- expectedOutput.add(new StreamRecord<String>("42", initialTime));
+ testHarness.processElement(new StreamRecord<>("Hello",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>(42, initialTime),
1, 1);
+ expectedOutput.add(new StreamRecord<>("Hello", initialTime));
+ expectedOutput.add(new StreamRecord<>("42", initialTime));
testHarness.waitForInputProcessing();
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput, testHarness.getOutput());
@@ -234,18 +241,18 @@ public void testWatermarkAndStreamStatusForwarding()
throws Exception {
@SuppressWarnings("unchecked")
public void testCheckpointBarriers() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String>
testHarness =
- new TwoInputStreamTaskTestHarness<String,
Integer, String>(
+ new TwoInputStreamTaskTestHarness<>(
TwoInputStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<String, Integer, String>(new IdentityMap());
+ CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -255,21 +262,21 @@ public void testCheckpointBarriers() throws Exception {
// This element should be buffered since we received a
checkpoint barrier on
// this input
- testHarness.processElement(new
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Hello-0-0",
initialTime), 0, 0);
// This one should go through
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0",
initialTime), 0, 1);
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0",
initialTime));
+ testHarness.processElement(new StreamRecord<>("Ciao-0-0",
initialTime), 0, 1);
+ expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
testHarness.waitForInputProcessing();
// These elements should be forwarded, since we did not yet
receive a checkpoint barrier
// on that input, only add to same input, otherwise we would
not know the ordering
// of the output since the Task might read the inputs in any
order
- testHarness.processElement(new StreamRecord<Integer>(11,
initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<Integer>(111,
initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("11", initialTime));
- expectedOutput.add(new StreamRecord<String>("111",
initialTime));
+ testHarness.processElement(new StreamRecord<>(11, initialTime),
1, 1);
+ testHarness.processElement(new StreamRecord<>(111,
initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<>("11", initialTime));
+ expectedOutput.add(new StreamRecord<>("111", initialTime));
testHarness.waitForInputProcessing();
@@ -298,7 +305,7 @@ public void testCheckpointBarriers() throws Exception {
// now we should see the barrier and after that the buffered
elements
expectedOutput.add(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Hello-0-0",
initialTime));
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
@@ -326,11 +333,11 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
testHarness.setupOutputForSingletonOperatorChain();
StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<String, Integer, String>(new IdentityMap());
+ CoStreamMap<String, Integer, String> coMapOperator = new
CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());
- ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
long initialTime = 0L;
testHarness.invoke();
@@ -340,16 +347,16 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
// These elements should be buffered until we receive barriers
from
// all inputs
- testHarness.processElement(new
StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Hello-0-0",
initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<>("Ciao-0-0",
initialTime), 0, 0);
// These elements should be forwarded, since we did not yet
receive a checkpoint barrier
// on that input, only add to same input, otherwise we would
not know the ordering
// of the output since the Task might read the inputs in any
order
- testHarness.processElement(new StreamRecord<Integer>(42,
initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<Integer>(1337,
initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("42", initialTime));
- expectedOutput.add(new StreamRecord<String>("1337",
initialTime));
+ testHarness.processElement(new StreamRecord<>(42, initialTime),
1, 1);
+ testHarness.processElement(new StreamRecord<>(1337,
initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<>("42", initialTime));
+ expectedOutput.add(new StreamRecord<>("1337", initialTime));
testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements
from non-blocked input
@@ -365,8 +372,8 @@ public void testOvertakingCheckpointBarriers() throws
Exception {
testHarness.processEvent(new CheckpointBarrier(1, 1,
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0",
initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Hello-0-0",
initialTime));
+ expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1,
CheckpointOptions.forCheckpointWithDefaultLocation()));
testHarness.waitForInputProcessing();
@@ -611,5 +618,118 @@ public String map2(Integer value) throws Exception {
return value.toString();
}
}
+
+ @Test
+ public void testMutableObjectReuse() throws Exception {
+ final TwoInputStreamTaskTestHarness<String, String, String>
testHarness = new TwoInputStreamTaskTestHarness<>(
+ TwoInputStreamTask::new,
+ new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO));
+
+ testHarness.setupOperatorChain(new OperatorID(), new
TestMutableObjectReuseHeadOperator())
+ .chain(new OperatorID(), new
TestMutableObjectReuseHeadOperator.TestMutableObjectReuseNextOperator(),
+ new TupleSerializer(Tuple2.class,
+ new
TypeSerializer<?>[]{BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()), BasicTypeInfo.INT_TYPE_INFO.createSerializer(new
ExecutionConfig())}))
+ .finish();
+
+ ExecutionConfig executionConfig =
testHarness.getExecutionConfig();
+ executionConfig.enableObjectReuse();
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<Object>();
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ testHarness.processElement(new
StreamRecord<>(Tuple2.of("Hello", 1)), 0, 0);
+ testHarness.processElement(new
StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1), 1, 0);
+ testHarness.processElement(new Watermark(initialTime + 1), 0,
0);
+ testHarness.processElement(new Watermark(initialTime + 1), 1,
0);
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao",
1), initialTime + 2), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao",
2), initialTime + 3), 1, 0);
+
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1)));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2),
initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1),
initialTime + 2));
+ expectedOutput.add(new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation()));
+ expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2),
initialTime + 3));
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+
+ // This must only be used in one test, otherwise the static fields will
be changed
+ // by several tests concurrently
+ private static class TestMutableObjectReuseHeadOperator
+ extends AbstractStreamOperator<Tuple2<String, Integer>>
+ implements TwoInputStreamOperator<Tuple2<String, Integer>,
Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static Object headOperatorValue;
+
+ private Object prevRecord1 = null;
+ private Object prevValue1 = null;
+
+ private Object prevRecord2 = null;
+ private Object prevValue2 = null;
+
+ @Override
+ public void processElement1(StreamRecord<Tuple2<String,
Integer>> element) throws Exception {
+ if (prevRecord1 != null) {
+ assertNotEquals("Reuse StreamRecord object in
the 1th input of the head operator.", prevRecord1, element);
+ assertEquals("No reuse value object in the 1th
input of the head operator.", prevValue1, element.getValue());
+ }
+
+ prevRecord1 = element;
+ prevValue1 = element.getValue();
+
+ headOperatorValue = element.getValue();
+
+ output.collect(element);
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Tuple2<String,
Integer>> element) {
+ if (prevRecord2 != null) {
+ assertNotEquals("Reuse StreamRecord object in
the 2th input of the head operator.", prevRecord2, element);
+ assertEquals("No reuse value object in the 2th
input of the head operator.", prevValue2, element.getValue());
+
+ if (prevValue1 != null) {
+ assertTrue("Reuse the same value object
in two inputs of the head operator.", prevValue1 != prevValue2);
+ }
+ }
+
+ prevRecord2 = element;
+ prevValue2 = element.getValue();
+
+ headOperatorValue = element.getValue();
+
+ output.collect(element);
+ }
+
+ private static class TestMutableObjectReuseNextOperator
+ extends AbstractStreamOperator<Tuple2<String, Integer>>
+ implements OneInputStreamOperator<Tuple2<String,
Integer>, Tuple2<String, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<String,
Integer>> element) throws Exception {
+ assertEquals("No reuse value object in chain.",
headOperatorValue, element.getValue());
+
+ output.collect(element);
+ }
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services