[hotfix][tests] Properly close StreamRecordWriter in network benchmarks

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6526fb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6526fb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6526fb7

Branch: refs/heads/master
Commit: c6526fb7dacedac9cc07a971c5553ba86abcf20f
Parents: 89605ad
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Mon Jan 29 13:00:33 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:35 2018 +0100

----------------------------------------------------------------------
 .../runtime/io/benchmark/LongRecordWriterThread.java  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6526fb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index e6cc2d5..7336b6b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.benchmark;
 
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.types.LongValue;
 
 import java.io.IOException;
@@ -33,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * records.
  */
 public class LongRecordWriterThread extends CheckedThread {
-       private final RecordWriter<LongValue> recordWriter;
+       private final StreamRecordWriter<LongValue> recordWriter;
 
        /**
         * Future to wait on a definition of the number of records to send.
@@ -42,7 +43,7 @@ public class LongRecordWriterThread extends CheckedThread {
 
        private volatile boolean running = true;
 
-       public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+       public LongRecordWriterThread(StreamRecordWriter<LongValue> 
recordWriter) {
                this.recordWriter = checkNotNull(recordWriter);
        }
 
@@ -74,8 +75,13 @@ public class LongRecordWriterThread extends CheckedThread {
 
        @Override
        public void go() throws Exception {
-               while (running) {
-                       sendRecords(getRecordsToSend().get());
+               try {
+                       while (running) {
+                               sendRecords(getRecordsToSend().get());
+                       }
+               }
+               finally {
+                       recordWriter.close();
                }
        }
 

Reply via email to