abstractdog commented on code in PR #263:
URL: https://github.com/apache/tez/pull/263#discussion_r1104266767
##########
tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java:
##########
@@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements
RecordReader<K, V> {
int idx = 0;
long progress;
RecordReader<K, V> curReader;
-
+ private final AtomicInteger initIndex;
+ private final int numReaders;
+ private ExecutorService initReaderExecService;
+ private BlockingDeque<Future<RecordReader<K, V>>> initedReaders;
+ private AtomicBoolean failureOccurred = new AtomicBoolean(false);
+
public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
+ this.initIndex = new AtomicInteger(0);
+ int numThreads =
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
+ this.numReaders = Math.min(groupedSplit.wrappedSplits.size(),
+
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
+
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT));
+ // skip multi-threaded split opening when number of readers is less than
1
Review Comment:
sorry, I just found this: "skip ... less than 1" is not 100% true,
numReaders=1 will also lead to skipping, this is important here, so only init
the executor service if numReaders are greater than 1
##########
tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java:
##########
@@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws
IOException {
// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
+ if (initReaderExecService != null) {
+ LOG.info("Shutting down the init record reader threadpool");
+ initReaderExecService.shutdownNow();
+ }
return false;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Init record reader for index " + idx + " of " +
+ LOG.debug("Init record reader for index " + idx + " of " +
groupedSplit.wrappedSplits.size());
}
// get a record reader for the idx-th chunk
try {
- curReader = wrappedInputFormat.getRecordReader(
- groupedSplit.wrappedSplits.get(idx), job, reporter);
+ // get the cur reader directly when async split opening is disabled
+ if (initReaderExecService == null) {
+ curReader =
wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job,
reporter);
+ } else {
+ preInitReaders();
+ curReader = initedReaders.take().get();
+ }
} catch (Exception e) {
- throw new RuntimeException (e);
+ failureOccurred.set(true);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ if (initedReaders != null) {
+ cancelsFutures();
+ }
+ throw new RuntimeException(e);
}
idx++;
- return true;
+ return curReader != null;
+ }
+
+ private void cancelsFutures() {
Review Comment:
nit: we don't use third person in method names that reflects an action, so
instead "cancelFutures"
##########
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java:
##########
@@ -102,6 +103,20 @@ public abstract class TezSplitGrouper {
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY =
"tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
+ /**
+ * Number of threads used to initialize the grouped splits, to
asynchronously open the readers.
+ */
+ public static final String TEZ_GROUPING_SPLIT_INIT_THREADS =
"tez.grouping.split.init-threads";
Review Comment:
this option should be in line with the other, both of them are about to
configure the number of something, so:
```
tez.grouping.split.init.threads
tez.grouping.split.init.recordreaders
```
or:
```
tez.grouping.split.init.num-threads
tez.grouping.split.init.num-recordreaders
```
I prefer the first, we don't really use "num" in TezConfiguration either
(only at 1 place), it's redundant I believe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]