cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
Review Comment:
`shutdown` does not wait for the submitted tasks to finish, so when the
`close()` method returns, it won't really be guaranteed that closing has
completed. We'll need a call to `awaitTermination` to make sure all tasks have
finished running.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
Review Comment:
Is this line left over from some manual testing?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
Review Comment:
I suggest making this configurable, with 10 as the default.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
Review Comment:
We know that we will generate exactly one callable for each `RecordWriter`.
We can create the `ArrayList` pre-allocated to exactly the correct size,
potentially avoiding reallocation inefficiencies: `new
ArrayList<>(recordWriters.size())`
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
Review Comment:
You can log a warning here that closing was interrupted.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
Review Comment:
I recommend using the version of this method that accepts a `ThreadFactory`,
and probably use `ThreadFactoryBuilder`. The factory should generate threads
that 1) use a naming format that makes it clear these threads are related to
the closing process (e.g. "MultipleOutputs-close"), and 2) set an
`UncaughtExceptionHandler` that logs the exception, which would make visible
unexpected errors like unchecked exceptions.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
+ }
+
+ if (ioException.get() != null) {
+ throw new IOException(ioException.get());
Review Comment:
With this approach, if multiple record writers throw an exception during
close, we'll only get visibility into one of them. I'd like to suggest a
slightly different approach. Within the callable, catch the exception, log it
immediately and flag an `AtomicBoolean`. Then, on this line, if that
`AtomicBoolean` is set, throw an `IOException` from the overall method, with a
message like "One or more threads encountered IOException during close. See
prior errors."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]