azagrebin commented on a change in pull request #8646: [FLINK-12735][network] 
Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r298927397
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 ##########
 @@ -160,33 +163,31 @@ public void close() {
                        super.close();
                }
        }
-       
-       /**
-        * Utility method to check whether the IO manager has been properly 
shut down. The IO manager is considered
-        * to be properly shut down when it is closed and its threads have 
ceased operation.
-        * 
-        * @return True, if the IO manager has properly shut down, false 
otherwise.
-        */
-       @Override
-       public boolean isProperlyShutDown() {
-               boolean readersShutDown = true;
-               for (ReaderThread rt : readers) {
-                       readersShutDown &= rt.getState() == 
Thread.State.TERMINATED;
-               }
-               
-               boolean writersShutDown = true;
-               for (WriterThread wt : writers) {
-                       writersShutDown &= wt.getState() == 
Thread.State.TERMINATED;
-               }
-               
-               return isShutdown.get() && readersShutDown && writersShutDown 
&& super.isProperlyShutDown();
+
+       private static AutoCloseable getWriterThreadCloser(WriterThread thread) 
{
+               return () -> {
+                       try {
+                               thread.shutdown();
+                       } catch (Throwable t) {
+                               throw new IOException("Error while shutting 
down IO Manager writer thread.", t);
+                       }
+               };
        }
 
+       private static AutoCloseable getReaderThreadCloser(ReaderThread thread) 
{
+               return () -> {
+                       try {
+                               thread.shutdown();
+                       } catch (Throwable t) {
+                               throw new IOException("Error while shutting 
down IO Manager reader thread.", t);
+                       }
+               };
+       }
 
        @Override
        public void uncaughtException(Thread t, Throwable e) {
                LOG.error("IO Thread '" + t.getName() + "' terminated due to an 
exception. Shutting down I/O Manager.", e);
-               close();
+               IOUtils.closeQuietly(this);
 
 Review comment:
   I think we can log the closing exception here as well

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to