This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f4f4510  [FLINK-12570][network] Work against ResultPartitionWriter 
interface
f4f4510 is described below

commit f4f451037c9bab96e241a2a4e13119b1c3bbf21c
Author: azagrebin <[email protected]>
AuthorDate: Fri May 24 09:38:51 2019 +0200

    [FLINK-12570][network] Work against ResultPartitionWriter interface
    
    This part of Shuffle API refactoring: make task not depend on the
    concrete implementation of ResultPartitionWriter (ResultPartition).
---
 .../network/api/writer/ResultPartitionWriter.java  | 19 +++++++++++++++++++
 .../io/network/partition/ResultPartition.java      |  2 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 11 +++++------
 .../AbstractCollectingResultPartitionWriter.java   | 11 +++++++++++
 .../io/network/api/writer/RecordWriterTest.java    | 22 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 49f74af..153b880 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -65,4 +67,21 @@ public interface ResultPartitionWriter extends AutoCloseable 
{
         * Manually trigger consumption from enqueued {@link BufferConsumer 
BufferConsumers} in one specified subpartition.
         */
        void flush(int subpartitionIndex);
+
+       /**
+        * Fail the production of the partition.
+        *
+        * <p>This method propagates non-{@code null} failure causes to 
consumers on a best-effort basis.
+        * Closing of partition is still needed.
+        *
+        * @param throwable failure cause
+        */
+       void fail(@Nullable Throwable throwable);
+
+       /**
+        * Successfully finish the production of the partition.
+        *
+        * <p>Closing of partition is still needed.
+        */
+       void finish() throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ca3855d..15f15e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -256,6 +256,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
         *
         * <p>For BLOCKING results, this will trigger the deployment of 
consuming tasks.
         */
+       @Override
        public void finish() throws IOException {
                boolean success = false;
 
@@ -313,6 +314,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                }
        }
 
+       @Override
        public void fail(@Nullable Throwable throwable) {
                partitionManager.releasePartition(partitionId, throwable);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1d95231..6742346b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -190,7 +189,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
        /** Serialized version of the job specific execution configuration (see 
{@link ExecutionConfig}). */
        private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
-       private final ResultPartition[] producedPartitions;
+       private final ResultPartitionWriter[] producedPartitions;
 
        private final SingleInputGate[] inputGates;
 
@@ -598,7 +597,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
 
                        setupPartionsAndGates(producedPartitions, inputGates);
 
-                       for (ResultPartition partition : producedPartitions) {
+                       for (ResultPartitionWriter partition : 
producedPartitions) {
                                
taskEventDispatcher.registerPartition(partition.getPartitionId());
                        }
 
@@ -688,7 +687,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                        // 
----------------------------------------------------------------
 
                        // finish the produced partitions. if this fails, we 
consider the execution failed.
-                       for (ResultPartition partition : producedPartitions) {
+                       for (ResultPartitionWriter partition : 
producedPartitions) {
                                if (partition != null) {
                                        partition.finish();
                                }
@@ -845,7 +844,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
        private void releaseNetworkResources() {
                LOG.debug("Release task {} network resources (state: {}).", 
taskNameWithSubtask, getExecutionState());
 
-               for (ResultPartition partition : producedPartitions) {
+               for (ResultPartitionWriter partition : producedPartitions) {
                        
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
                        if (isCanceledOrFailed()) {
                                partition.fail(getFailureCause());
@@ -860,7 +859,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
         * release partitions and gates. Another is from task thread during 
task exiting.
         */
        private void closeNetworkResources() {
-               for (ResultPartition partition : producedPartitions) {
+               for (ResultPartitionWriter partition : producedPartitions) {
                        try {
                                partition.close();
                        } catch (Throwable t) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index fd38ee8..8ae8f5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
@@ -109,5 +110,15 @@ public abstract class 
AbstractCollectingResultPartitionWriter implements ResultP
        public void close() {
        }
 
+       @Override
+       public void fail(@Nullable Throwable throwable) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void finish() {
+               throw new UnsupportedOperationException();
+       }
+
        protected abstract void deserializeBuffer(Buffer buffer) throws 
IOException;
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 956c4e2..35487b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -55,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -511,6 +513,16 @@ public class RecordWriterTest {
                }
 
                @Override
+               public void fail(@Nullable Throwable throwable) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void finish() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public void close() {
                }
        }
@@ -576,6 +588,16 @@ public class RecordWriterTest {
                }
 
                @Override
+               public void fail(@Nullable Throwable throwable) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void finish() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public void close() {
                }
        }

Reply via email to