azagrebin commented on a change in pull request #8646: [FLINK-12735][network] 
Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r298927259
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 ##########
 @@ -126,22 +131,20 @@ public void close() {
                        // close writing and reading threads with best effort 
and log problems
                        // first notify all to close, then wait until all are 
closed
 
-                       for (WriterThread wt : writers) {
-                               try {
-                                       wt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager writer thread.", t);
-                               }
-                       }
-                       for (ReaderThread rt : readers) {
-                               try {
-                                       rt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager reader thread.", t);
-                               }
-                       }
+                       Iterable<AutoCloseable> writerThreadCloseables = 
Arrays.stream(writers)
+                               .filter(Objects::nonNull)
+                               .filter(Thread::isAlive)
+                               .map(IOManagerAsync::getWriterThreadCloser)
+                               .collect(Collectors.toList());
+
+                       Iterable<AutoCloseable> readerThreadCloseables = 
Arrays.stream(readers)
+                               .filter(Objects::nonNull)
+                               .filter(Thread::isAlive)
+                               .map(IOManagerAsync::getReaderThreadCloser)
+                               .collect(Collectors.toList());
+
+                       IOUtils.closeAll(writerThreadCloseables, 
readerThreadCloseables);
 
 Review comment:
   Instead of modifying `IOUtils.closeAll` method, I would suggest the 
following approach:
   ```
           // close writing and reading threads with best effort and log 
problems
                // first notify all to close, then wait until all are closed
   
                List<AutoCloseable> closables = new ArrayList<>(2 * 
writers.length + 2 * readers.length + 1);
   
                Arrays.stream(writers)
                        .filter(Objects::nonNull)
                        .filter(Thread::isAlive)
                        .map(IOManagerAsync::getWriterThreadCloser)
                        .forEach(closables::add);
   
                Arrays.stream(readers)
                        .filter(Objects::nonNull)
                        .filter(Thread::isAlive)
                        .map(IOManagerAsync::getReaderThreadCloser)
                        .forEach(closables::add);
   
                for (WriterThread wt : writers) {
                        closables.add(wt::join);
                }
   
                for (ReaderThread rt : readers) {
                        closables.add(rt::join);
                }
   
                // make sure we call the super implementation in any case and 
at the last point,
                // because this will clean up the I/O directories
                closables.add(super::close);
   
                IOUtils.closeAll(closables);
   ```
   This way we also try to close everything

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to