This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d1362  [FLINK-11885][network] Introduce RecordWriterBuilder for 
creating RecordWriter instance
35d1362 is described below

commit 35d13628427fe0d05ed79efce151cd63ee1eb911
Author: Zhijiang <[email protected]>
AuthorDate: Tue Mar 12 17:45:23 2019 +0800

    [FLINK-11885][network] Introduce RecordWriterBuilder for creating 
RecordWriter instance
---
 .../network/api/writer/BroadcastRecordWriter.java  |  2 +-
 .../io/network/api/writer/RecordWriter.java        | 30 +------------
 ...tRecordWriter.java => RecordWriterBuilder.java} | 49 ++++++++++++----------
 .../runtime/iterative/task/IterationHeadTask.java  |  3 +-
 .../apache/flink/runtime/operators/BatchTask.java  |  9 ++--
 .../io/network/api/writer/RecordWriterTest.java    | 17 ++++----
 .../SlotCountExceedingParallelismTest.java         |  3 +-
 .../scheduler/ScheduleOrUpdateConsumersTest.java   |  9 ++--
 .../jobmaster/TestingAbstractInvokables.java       |  3 +-
 .../TaskCancelAsyncProducerConsumerITCase.java     |  3 +-
 .../flink/runtime/taskmanager/TaskManagerTest.java |  3 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala    |  5 ++-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  8 +++-
 .../StreamNetworkBenchmarkEnvironment.java         |  4 +-
 .../test/runtime/NetworkStackThroughputITCase.java |  5 ++-
 15 files changed, 72 insertions(+), 81 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 82cc167..effff59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -32,7 +32,7 @@ import java.io.IOException;
  */
 public class BroadcastRecordWriter<T extends IOReadableWritable> extends 
RecordWriter<T> {
 
-       public BroadcastRecordWriter(
+       BroadcastRecordWriter(
                        ResultPartitionWriter writer,
                        ChannelSelector<T> channelSelector,
                        long timeout,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 8817a3f..1743576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -87,15 +87,7 @@ public class RecordWriter<T extends IOReadableWritable> {
        /** To avoid synchronization overhead on the critical path, best-effort 
error tracking is enough here.*/
        private Throwable flusherException;
 
-       public RecordWriter(ResultPartitionWriter writer) {
-               this(writer, new RoundRobinChannelSelector<T>(), -1, null);
-       }
-
-       public RecordWriter(
-                       ResultPartitionWriter writer,
-                       ChannelSelector<T> channelSelector,
-                       long timeout,
-                       String taskName) {
+       RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> 
channelSelector, long timeout, String taskName) {
                this.targetPartition = writer;
                this.channelSelector = channelSelector;
                this.numberOfChannels = writer.getNumberOfSubpartitions();
@@ -310,28 +302,8 @@ public class RecordWriter<T extends IOReadableWritable> {
                }
        }
 
-       public static RecordWriter createRecordWriter(
-                       ResultPartitionWriter writer,
-                       ChannelSelector channelSelector,
-                       long timeout,
-                       String taskName) {
-               if (channelSelector.isBroadcast()) {
-                       return new BroadcastRecordWriter<>(writer, 
channelSelector, timeout, taskName);
-               } else {
-                       return new RecordWriter<>(writer, channelSelector, 
timeout, taskName);
-               }
-       }
-
-       public static RecordWriter createRecordWriter(
-                       ResultPartitionWriter writer,
-                       ChannelSelector channelSelector,
-                       String taskName) {
-               return createRecordWriter(writer, channelSelector, -1, 
taskName);
-       }
-
        // 
------------------------------------------------------------------------
 
-
        /**
         * A dedicated thread that periodically flushes the output buffers, to 
set upper latency bounds.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
similarity index 51%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
index 82cc167..79b372b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterBuilder.java
@@ -18,30 +18,37 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.core.io.IOReadableWritable;
-
-import java.io.IOException;
-
 /**
- * A special record-oriented runtime result writer only for broadcast mode.
- *
- * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and handles 
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more 
efficient way.
- *
- * @param <T> the type of the record that can be emitted with this record 
writer
+ * Utility class to encapsulate the logic of building a {@link RecordWriter} 
instance.
  */
-public class BroadcastRecordWriter<T extends IOReadableWritable> extends 
RecordWriter<T> {
-
-       public BroadcastRecordWriter(
-                       ResultPartitionWriter writer,
-                       ChannelSelector<T> channelSelector,
-                       long timeout,
-                       String taskName) {
-               super(writer, channelSelector, timeout, taskName);
+public class RecordWriterBuilder {
+
+       private ChannelSelector selector = new RoundRobinChannelSelector();
+
+       private long timeout = -1;
+
+       private String taskName = "test";
+
+       public RecordWriterBuilder setChannelSelector(ChannelSelector selector) 
{
+               this.selector = selector;
+               return this;
+       }
+
+       public RecordWriterBuilder setTimeout(long timeout) {
+               this.timeout = timeout;
+               return this;
+       }
+
+       public RecordWriterBuilder setTaskName(String taskName) {
+               this.taskName = taskName;
+               return this;
        }
 
-       @Override
-       public void emit(T record) throws IOException, InterruptedException {
-               broadcastEmit(record);
+       public RecordWriter build(ResultPartitionWriter writer) {
+               if (selector.isBroadcast()) {
+                       return new BroadcastRecordWriter(writer, selector, 
timeout, taskName);
+               } else {
+                       return new RecordWriter(writer, selector, timeout, 
taskName);
+               }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 9f9af36..2ee76fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -142,7 +143,7 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
                        throw new Exception("Error: Inconsistent head task 
setup - wrong mapping of output gates.");
                }
                // now, we can instantiate the sync gate
-               this.toSync = new 
RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
+               this.toSync = new 
RecordWriterBuilder().build(getEnvironment().getWriter(syncGateIndex));
                this.toSyncPartitionId = 
getEnvironment().getWriter(syncGateIndex).getPartitionId();
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 864411c..5511d28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -1250,10 +1251,10 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                                oe = new OutputEmitter<T>(strategy, 
indexInSubtaskGroup, comparator, partitioner, dataDist);
                        }
 
-                       final RecordWriter<SerializationDelegate<T>> 
recordWriter = RecordWriter.createRecordWriter(
-                               task.getEnvironment().getWriter(outputOffset + 
i),
-                               oe,
-                               
task.getEnvironment().getTaskInfo().getTaskName());
+                       final RecordWriter<SerializationDelegate<T>> 
recordWriter = new RecordWriterBuilder()
+                               .setChannelSelector(oe)
+                               
.setTaskName(task.getEnvironment().getTaskInfo().getTaskName())
+                               
.build(task.getEnvironment().getWriter(outputOffset + i));
 
                        
recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index c6d0e02..9b496ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -130,7 +130,7 @@ public class RecordWriterTest {
 
                        ResultPartitionWriter partitionWriter = new 
RecyclingPartitionWriter(bufferProvider);
 
-                       final RecordWriter<IntValue> recordWriter = new 
RecordWriter<IntValue>(partitionWriter);
+                       final RecordWriter<IntValue> recordWriter = new 
RecordWriterBuilder().build(partitionWriter);
 
                        Future<?> result = executor.submit(new Callable<Void>() 
{
                                @Override
@@ -182,7 +182,7 @@ public class RecordWriterTest {
                ResultPartitionWriter partitionWriter =
                        spy(new RecyclingPartitionWriter(new 
TestPooledBufferProvider(1, 16)));
 
-               RecordWriter<IntValue> recordWriter = new 
RecordWriter<>(partitionWriter);
+               RecordWriter<IntValue> recordWriter = new 
RecordWriterBuilder().build(partitionWriter);
 
                // Fill a buffer, but don't write it out.
                recordWriter.emit(new IntValue(0));
@@ -212,7 +212,7 @@ public class RecordWriterTest {
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter);
+               RecordWriter<ByteArrayIO> writer = new 
RecordWriterBuilder().build(partitionWriter);
                CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // No records emitted yet, broadcast should not request a buffer
@@ -249,7 +249,7 @@ public class RecordWriterTest {
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter);
+               RecordWriter<ByteArrayIO> writer = new 
RecordWriterBuilder().build(partitionWriter);
                CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // Emit records on some channels first (requesting buffers), 
then
@@ -311,7 +311,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partition =
                        new CollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
-               RecordWriter<?> writer = new RecordWriter<>(partition);
+               RecordWriter<?> writer = new 
RecordWriterBuilder().build(partition);
 
                writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
 
@@ -373,7 +373,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partition =
                        new CollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
-               RecordWriter<IntValue> writer = new RecordWriter<>(partition);
+               RecordWriter<IntValue> writer = new 
RecordWriterBuilder().build(partition);
 
                if (broadcastEvent) {
                        writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
@@ -415,7 +415,10 @@ public class RecordWriterTest {
                final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
                final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
                final ChannelSelector selector = new 
OutputEmitter(ShipStrategyType.BROADCAST, 0);
-               final RecordWriter<SerializationTestType> writer = 
RecordWriter.createRecordWriter(partitionWriter, selector, 0, "test");
+               final RecordWriter<SerializationTestType> writer = new 
RecordWriterBuilder()
+                       .setChannelSelector(selector)
+                       .setTimeout(0)
+                       .build(partitionWriter);
                final RecordDeserializer<SerializationTestType> deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
                        new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 9b16e86..faf2dfe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -140,7 +141,7 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
 
                @Override
                public void invoke() throws Exception {
-                       RecordWriter<IntValue> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+                       RecordWriter<IntValue> writer = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
                        final int numberOfTimesToSend = 
getTaskConfiguration().getInteger(CONFIG_KEY, 0);
 
                        final IntValue subtaskIndex = new IntValue(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f382f04..58af0e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -147,12 +148,8 @@ public class ScheduleOrUpdateConsumersTest extends 
TestLogger {
 
                        // The order of intermediate result creation in the job 
graph specifies which produced
                        // result partition is pipelined/blocking.
-                       final RecordWriter<IntValue> pipelinedWriter =
-                                       new 
RecordWriter<>(getEnvironment().getWriter(0));
-
-                       final RecordWriter<IntValue> blockingWriter =
-                                       new 
RecordWriter<>(getEnvironment().getWriter(1));
-
+                       final RecordWriter<IntValue> pipelinedWriter = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
+                       final RecordWriter<IntValue> blockingWriter = new 
RecordWriterBuilder().build(getEnvironment().getWriter(1));
                        writers.add(pipelinedWriter);
                        writers.add(blockingWriter);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
index d227918..4e8d7eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.types.IntValue;
 
@@ -44,7 +45,7 @@ public class TestingAbstractInvokables {
 
                @Override
                public void invoke() throws Exception {
-                       final RecordWriter<IntValue> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+                       final RecordWriter<IntValue> writer = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
 
                        try {
                                writer.emit(new IntValue(42));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index ddae846..85573af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -213,7 +214,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends 
TestLogger {
                        private final RecordWriter<LongValue> recordWriter;
 
                        public ProducerThread(ResultPartitionWriter 
partitionWriter) {
-                               this.recordWriter = new 
RecordWriter<>(partitionWriter);
+                               this.recordWriter = new 
RecordWriterBuilder().build(partitionWriter);
                        }
 
                        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index edeb5d1..937fb1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -2093,7 +2094,7 @@ public class TaskManagerTest extends TestLogger {
                @Override
                public void invoke() throws Exception {
                        final Object o = new Object();
-                       RecordWriter<IntValue> recordWriter = new 
RecordWriter<>(getEnvironment().getWriter(0));
+                       RecordWriter<IntValue> recordWriter = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
 
                        for (int i = 0; i < 1024; i++) {
                                recordWriter.emit(new IntValue(42));
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 472d15c..ea11f91 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import org.apache.flink.runtime.execution.Environment
 import org.apache.flink.runtime.io.network.api.reader.RecordReader
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter
+import org.apache.flink.runtime.io.network.api.writer.{RecordWriter, 
RecordWriterBuilder}
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.types.IntValue
 
@@ -35,7 +35,8 @@ object Tasks {
         classOf[IntValue],
         getEnvironment.getTaskManagerInfo.getTmpDirectories)
       
-      val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
+      val writer = new RecordWriterBuilder().build(
+        getEnvironment.getWriter(0)).asInstanceOf[RecordWriter[IntValue]]
 
       try {
         while (true) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 40e31dd..5bc4ca3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -1198,8 +1199,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
-               RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
-                       RecordWriter.createRecordWriter(bufferWriter, 
outputPartitioner, bufferTimeout, taskName);
+               RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = 
new RecordWriterBuilder()
+                       .setChannelSelector(outputPartitioner)
+                       .setTimeout(bufferTimeout)
+                       .setTaskName(taskName)
+                       .build(bufferWriter);
                
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
                return output;
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index ed14db4..4acd1f3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
@@ -183,7 +183,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
 
        public RecordWriter<T> createRecordWriter(int partitionIndex, long 
flushTimeout) throws Exception {
                ResultPartitionWriter sender = createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
-               return new RecordWriter<>(sender, new 
RoundRobinChannelSelector<T>(), flushTimeout, null);
+               return new 
RecordWriterBuilder().setTimeout(flushTimeout).build(sender);
        }
 
        private void generatePartitionIds() throws Exception {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index ad1de11..feb2317 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -77,7 +78,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
                @Override
                public void invoke() throws Exception {
-                       RecordWriter<SpeedTestRecord> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+                       RecordWriter<SpeedTestRecord> writer = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
 
                        try {
                                // Determine the amount of data to send per 
subtask
@@ -127,7 +128,7 @@ public class NetworkStackThroughputITCase extends 
TestLogger {
                                        SpeedTestRecord.class,
                                        
getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
-                       RecordWriter<SpeedTestRecord> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
+                       RecordWriter<SpeedTestRecord> writer = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
 
                        try {
                                SpeedTestRecord record;

Reply via email to