abstractdog commented on code in PR #263:
URL: https://github.com/apache/tez/pull/263#discussion_r1104266767


##########
tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java:
##########
@@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements 
RecordReader<K, V> {
     int idx = 0;
     long progress;
     RecordReader<K, V> curReader;
-    
+    private final AtomicInteger initIndex;
+    private final int numReaders;
+    private ExecutorService initReaderExecService;
+    private BlockingDeque<Future<RecordReader<K, V>>> initedReaders;
+    private AtomicBoolean failureOccurred = new AtomicBoolean(false);
+
     public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
         Reporter reporter) throws IOException {
       this.groupedSplit = split;
       this.job = job;
       this.reporter = reporter;
+      this.initIndex = new AtomicInteger(0);
+      int numThreads = 
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
+          TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
+      this.numReaders = Math.min(groupedSplit.wrappedSplits.size(),
+          
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
+              
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT));
+      // skip multi-threaded split opening when number of readers is less than 
1

Review Comment:
   sorry, I just found this: "skip ... less than 1" is not 100% true, 
numReaders=1 will also lead to skipping, this is important here, so only init 
the executor service if numReaders are greater than 1



##########
tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java:
##########
@@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws 
IOException {
 
       // if all chunks have been processed, nothing more to do.
       if (idx == groupedSplit.wrappedSplits.size()) {
+        if (initReaderExecService != null) {
+          LOG.info("Shutting down the init record reader threadpool");
+          initReaderExecService.shutdownNow();
+        }
         return false;
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Init record reader for index " + idx + " of " + 
+        LOG.debug("Init record reader for index " + idx + " of " +
                   groupedSplit.wrappedSplits.size());
       }
 
       // get a record reader for the idx-th chunk
       try {
-        curReader = wrappedInputFormat.getRecordReader(
-            groupedSplit.wrappedSplits.get(idx), job, reporter);
+        // get the cur reader directly when async split opening is disabled
+        if (initReaderExecService == null) {
+          curReader = 
wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job, 
reporter);
+        } else {
+          preInitReaders();
+          curReader = initedReaders.take().get();
+        }
       } catch (Exception e) {
-        throw new RuntimeException (e);
+        failureOccurred.set(true);
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        if (initedReaders != null) {
+          cancelsFutures();
+        }
+        throw new RuntimeException(e);
       }
       idx++;
-      return true;
+      return curReader != null;
+    }
+
+    private void cancelsFutures() {

Review Comment:
   nit: we don't use third person in method names that reflects an action, so 
instead "cancelFutures"



##########
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java:
##########
@@ -102,6 +103,20 @@ public abstract class TezSplitGrouper {
   public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = 
"tez.grouping.node.local.only";
   public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
 
+  /**
+   * Number of threads used to initialize the grouped splits, to 
asynchronously open the readers.
+   */
+  public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = 
"tez.grouping.split.init-threads";

Review Comment:
   this option should be in line with the other, both of them are about to 
configure the number of something, so:
   ```
   tez.grouping.split.init.threads
   tez.grouping.split.init.recordreaders
   ```
   or:
   ```
   tez.grouping.split.init.num-threads
   tez.grouping.split.init.num-recordreaders
   ```
   
   I prefer the first, we don't really use "num" in TezConfiguration either 
(only at 1 place), it's redundant I believe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to