This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 35d1362 [FLINK-11885][network] Introduce RecordWriterBuilder for
creating RecordWriter instance
35d1362 is described below
commit 35d13628427fe0d05ed79efce151cd63ee1eb911
Author: Zhijiang <[email protected]>
AuthorDate: Tue Mar 12 17:45:23 2019 +0800
[FLINK-11885][network] Introduce RecordWriterBuilder for creating
RecordWriter instance
---
.../network/api/writer/BroadcastRecordWriter.java | 2 +-
.../io/network/api/writer/RecordWriter.java | 30 +------------
...tRecordWriter.java => RecordWriterBuilder.java} | 49 ++++++++++++----------
.../runtime/iterative/task/IterationHeadTask.java | 3 +-
.../apache/flink/runtime/operators/BatchTask.java | 9 ++--
.../io/network/api/writer/RecordWriterTest.java | 17 ++++----
.../SlotCountExceedingParallelismTest.java | 3 +-
.../scheduler/ScheduleOrUpdateConsumersTest.java | 9 ++--
.../jobmaster/TestingAbstractInvokables.java | 3 +-
.../TaskCancelAsyncProducerConsumerITCase.java | 3 +-
.../flink/runtime/taskmanager/TaskManagerTest.java | 3 +-
.../apache/flink/runtime/jobmanager/Tasks.scala | 5 ++-
.../flink/streaming/runtime/tasks/StreamTask.java | 8 +++-
.../StreamNetworkBenchmarkEnvironment.java | 4 +-
.../test/runtime/NetworkStackThroughputITCase.java | 5 ++-
15 files changed, 72 insertions(+), 81 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 82cc167..effff59 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -32,7 +32,7 @@ import java.io.IOException;
*/
public class BroadcastRecordWriter<T extends IOReadableWritable> extends
RecordWriter<T> {
- public BroadcastRecordWriter(
+ BroadcastRecordWriter(
ResultPartitionWriter writer,
ChannelSelector<T> channelSelector,
long timeout,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 8817a3f..1743576 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -87,15 +87,7 @@ public class RecordWriter<T extends IOReadableWritable> {
/** To avoid synchronization overhead on the critical path, best-effort
error tracking is enough here.*/
private Throwable flusherException;
- public RecordWriter(ResultPartitionWriter writer) {
- this(writer, new RoundRobinChannelSelector<T>(), -1, null);
- }
-
- public RecordWriter(
- ResultPartitionWriter writer,
- ChannelSelector<T> channelSelector,
- long timeout,
- String taskName) {
+ RecordWriter(ResultPartitionWriter writer, ChannelSelector<T>
channelSelector, long timeout, String taskName) {
this.targetPartition = writer;
this.channelSelector = channelSelector;
this.numberOfChannels = writer.getNumberOfSubpartitions();
@@ -310,28 +302,8 @@ public class RecordWriter<T extends IOReadableWritable> {
}
}
- public static RecordWriter createRecordWriter(
- ResultPartitionWriter writer,
- ChannelSelector channelSelector,
- long timeout,
- String taskName) {
- if (channelSelector.isBroadcast()) {
- return new BroadcastRecordWriter<>(writer,
channelSelector, timeout, taskName);
- } else {
- return new RecordWriter<>(writer, channelSelector,
timeout, taskName);
- }
- }
-
- public static RecordWriter createRecordWriter(
- ResultPartitionWriter writer,
- ChannelSelector channelSelector,
- String taskName) {
- return createRecordWriter(writer, channelSelector, -1,
taskName);
- }
-
//
------------------------------------------------------------------------
-
/**
* A dedicated thread that periodically flushes the output buffers, to
set upper latency bounds.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
similarity index 51%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
index 82cc167..79b372b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
@@ -18,30 +18,37 @@
package org.apache.flink.runtime.io.network.api.writer;
-import org.apache.flink.core.io.IOReadableWritable;
-
-import java.io.IOException;
-
/**
- * A special record-oriented runtime result writer only for broadcast mode.
- *
- * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and handles
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more
efficient way.
- *
- * @param <T> the type of the record that can be emitted with this record
writer
+ * Utility class to encapsulate the logic of building a {@link RecordWriter}
instance.
*/
-public class BroadcastRecordWriter<T extends IOReadableWritable> extends
RecordWriter<T> {
-
- public BroadcastRecordWriter(
- ResultPartitionWriter writer,
- ChannelSelector<T> channelSelector,
- long timeout,
- String taskName) {
- super(writer, channelSelector, timeout, taskName);
+public class RecordWriterBuilder {
+
+ private ChannelSelector selector = new RoundRobinChannelSelector();
+
+ private long timeout = -1;
+
+ private String taskName = "test";
+
+ public RecordWriterBuilder setChannelSelector(ChannelSelector selector)
{
+ this.selector = selector;
+ return this;
+ }
+
+ public RecordWriterBuilder setTimeout(long timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public RecordWriterBuilder setTaskName(String taskName) {
+ this.taskName = taskName;
+ return this;
}
- @Override
- public void emit(T record) throws IOException, InterruptedException {
- broadcastEmit(record);
+ public RecordWriter build(ResultPartitionWriter writer) {
+ if (selector.isBroadcast()) {
+ return new BroadcastRecordWriter(writer, selector,
timeout, taskName);
+ } else {
+ return new RecordWriter(writer, selector, timeout,
taskName);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 9f9af36..2ee76fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -142,7 +143,7 @@ public class IterationHeadTask<X, Y, S extends Function,
OT> extends AbstractIte
throw new Exception("Error: Inconsistent head task
setup - wrong mapping of output gates.");
}
// now, we can instantiate the sync gate
- this.toSync = new
RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
+ this.toSync = new
RecordWriterBuilder().build(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId =
getEnvironment().getWriter(syncGateIndex).getPartitionId();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 864411c..5511d28 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -39,6 +39,7 @@ import
org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -1250,10 +1251,10 @@ public class BatchTask<S extends Function, OT> extends
AbstractInvokable impleme
oe = new OutputEmitter<T>(strategy,
indexInSubtaskGroup, comparator, partitioner, dataDist);
}
- final RecordWriter<SerializationDelegate<T>>
recordWriter = RecordWriter.createRecordWriter(
- task.getEnvironment().getWriter(outputOffset +
i),
- oe,
-
task.getEnvironment().getTaskInfo().getTaskName());
+ final RecordWriter<SerializationDelegate<T>>
recordWriter = new RecordWriterBuilder()
+ .setChannelSelector(oe)
+
.setTaskName(task.getEnvironment().getTaskInfo().getTaskName())
+
.build(task.getEnvironment().getWriter(outputOffset + i));
recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index c6d0e02..9b496ba 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -130,7 +130,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = new
RecyclingPartitionWriter(bufferProvider);
- final RecordWriter<IntValue> recordWriter = new
RecordWriter<IntValue>(partitionWriter);
+ final RecordWriter<IntValue> recordWriter = new
RecordWriterBuilder().build(partitionWriter);
Future<?> result = executor.submit(new Callable<Void>()
{
@Override
@@ -182,7 +182,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter =
spy(new RecyclingPartitionWriter(new
TestPooledBufferProvider(1, 16)));
- RecordWriter<IntValue> recordWriter = new
RecordWriter<>(partitionWriter);
+ RecordWriter<IntValue> recordWriter = new
RecordWriterBuilder().build(partitionWriter);
// Fill a buffer, but don't write it out.
recordWriter.emit(new IntValue(0));
@@ -212,7 +212,7 @@ public class RecordWriterTest {
TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = new
RecordWriter<>(partitionWriter);
+ RecordWriter<ByteArrayIO> writer = new
RecordWriterBuilder().build(partitionWriter);
CheckpointBarrier barrier = new
CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L,
CheckpointOptions.forCheckpointWithDefaultLocation());
// No records emitted yet, broadcast should not request a buffer
@@ -249,7 +249,7 @@ public class RecordWriterTest {
TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = new
RecordWriter<>(partitionWriter);
+ RecordWriter<ByteArrayIO> writer = new
RecordWriterBuilder().build(partitionWriter);
CheckpointBarrier barrier = new
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L,
CheckpointOptions.forCheckpointWithDefaultLocation());
// Emit records on some channels first (requesting buffers),
then
@@ -311,7 +311,7 @@ public class RecordWriterTest {
ResultPartitionWriter partition =
new CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
- RecordWriter<?> writer = new RecordWriter<>(partition);
+ RecordWriter<?> writer = new
RecordWriterBuilder().build(partition);
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
@@ -373,7 +373,7 @@ public class RecordWriterTest {
ResultPartitionWriter partition =
new CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
- RecordWriter<IntValue> writer = new RecordWriter<>(partition);
+ RecordWriter<IntValue> writer = new
RecordWriterBuilder().build(partition);
if (broadcastEvent) {
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
@@ -415,7 +415,10 @@ public class RecordWriterTest {
final TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
final ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
final ChannelSelector selector = new
OutputEmitter(ShipStrategyType.BROADCAST, 0);
- final RecordWriter<SerializationTestType> writer =
RecordWriter.createRecordWriter(partitionWriter, selector, 0, "test");
+ final RecordWriter<SerializationTestType> writer = new
RecordWriterBuilder()
+ .setChannelSelector(selector)
+ .setTimeout(0)
+ .build(partitionWriter);
final RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 9b16e86..faf2dfe 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -140,7 +141,7 @@ public class SlotCountExceedingParallelismTest extends
TestLogger {
@Override
public void invoke() throws Exception {
- RecordWriter<IntValue> writer = new
RecordWriter<>(getEnvironment().getWriter(0));
+ RecordWriter<IntValue> writer = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
final int numberOfTimesToSend =
getTaskConfiguration().getInteger(CONFIG_KEY, 0);
final IntValue subtaskIndex = new IntValue(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f382f04..58af0e2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -147,12 +148,8 @@ public class ScheduleOrUpdateConsumersTest extends
TestLogger {
// The order of intermediate result creation in the job
graph specifies which produced
// result partition is pipelined/blocking.
- final RecordWriter<IntValue> pipelinedWriter =
- new
RecordWriter<>(getEnvironment().getWriter(0));
-
- final RecordWriter<IntValue> blockingWriter =
- new
RecordWriter<>(getEnvironment().getWriter(1));
-
+ final RecordWriter<IntValue> pipelinedWriter = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
+ final RecordWriter<IntValue> blockingWriter = new
RecordWriterBuilder().build(getEnvironment().getWriter(1));
writers.add(pipelinedWriter);
writers.add(blockingWriter);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
index d227918..4e8d7eb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.types.IntValue;
@@ -44,7 +45,7 @@ public class TestingAbstractInvokables {
@Override
public void invoke() throws Exception {
- final RecordWriter<IntValue> writer = new
RecordWriter<>(getEnvironment().getWriter(0));
+ final RecordWriter<IntValue> writer = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
try {
writer.emit(new IntValue(42));
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index ddae846..85573af 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -213,7 +214,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends
TestLogger {
private final RecordWriter<LongValue> recordWriter;
public ProducerThread(ResultPartitionWriter
partitionWriter) {
- this.recordWriter = new
RecordWriter<>(partitionWriter);
+ this.recordWriter = new
RecordWriterBuilder().build(partitionWriter);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index edeb5d1..937fb1e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -2093,7 +2094,7 @@ public class TaskManagerTest extends TestLogger {
@Override
public void invoke() throws Exception {
final Object o = new Object();
- RecordWriter<IntValue> recordWriter = new
RecordWriter<>(getEnvironment().getWriter(0));
+ RecordWriter<IntValue> recordWriter = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
for (int i = 0; i < 1024; i++) {
recordWriter.emit(new IntValue(42));
diff --git
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 472d15c..ea11f91 100644
---
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager
import org.apache.flink.runtime.execution.Environment
import org.apache.flink.runtime.io.network.api.reader.RecordReader
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter
+import org.apache.flink.runtime.io.network.api.writer.{RecordWriter,
RecordWriterBuilder}
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
import org.apache.flink.types.IntValue
@@ -35,7 +35,8 @@ object Tasks {
classOf[IntValue],
getEnvironment.getTaskManagerInfo.getTmpDirectories)
- val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
+ val writer = new RecordWriterBuilder().build(
+ getEnvironment.getWriter(0)).asInstanceOf[RecordWriter[IntValue]]
try {
while (true) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 40e31dd..5bc4ca3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -1198,8 +1199,11 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
}
- RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
- RecordWriter.createRecordWriter(bufferWriter,
outputPartitioner, bufferTimeout, taskName);
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new RecordWriterBuilder()
+ .setChannelSelector(outputPartitioner)
+ .setTimeout(bufferTimeout)
+ .setTaskName(taskName)
+ .build(bufferWriter);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index ed14db4..4acd1f3 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
@@ -183,7 +183,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends
IOReadableWritable> {
public RecordWriter<T> createRecordWriter(int partitionIndex, long
flushTimeout) throws Exception {
ResultPartitionWriter sender = createResultPartition(jobId,
partitionIds[partitionIndex], senderEnv, channels);
- return new RecordWriter<>(sender, new
RoundRobinChannelSelector<T>(), flushTimeout, null);
+ return new
RecordWriterBuilder().setTimeout(flushTimeout).build(sender);
}
private void generatePartitionIds() throws Exception {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index ad1de11..feb2317 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -77,7 +78,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
@Override
public void invoke() throws Exception {
- RecordWriter<SpeedTestRecord> writer = new
RecordWriter<>(getEnvironment().getWriter(0));
+ RecordWriter<SpeedTestRecord> writer = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
try {
// Determine the amount of data to send per
subtask
@@ -127,7 +128,7 @@ public class NetworkStackThroughputITCase extends
TestLogger {
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
- RecordWriter<SpeedTestRecord> writer = new
RecordWriter<>(getEnvironment().getWriter(0));
+ RecordWriter<SpeedTestRecord> writer = new
RecordWriterBuilder().build(getEnvironment().getWriter(0));
try {
SpeedTestRecord record;