kbendick commented on a change in pull request #2803:
URL: https://github.com/apache/iceberg/pull/2803#discussion_r667567397
##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -205,35 +215,62 @@ public StructType readSchema() {
// broadcast the table metadata as input partitions will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTable.copyOf(table));
- List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
- for (CombinedScanTask task : tasks()) {
- readTasks.add(new ReadTask<>(
- task, tableBroadcast, expectedSchemaString, caseSensitive,
- localityPreferred, new BatchReaderFactory(batchSize)));
+ int taskSize = tasks().size();
+ InputPartition<ColumnarBatch>[] readTasks = new InputPartition[taskSize];
+ Long startTime = System.currentTimeMillis();
+ try {
+ pool.submit(() -> IntStream.range(0, taskSize).parallel()
Review comment:
Should the construction of all read tasks be done in a single `submit`
to the thread pool? I don’t see any way to slow the parallelism down here so as
to not potentially overwhelm the name node.
For example, I would have expected that each of the ranges in the int stream
we’re submitted to the pool invidually, so that tasks queue up waiting for
their turn. Here, it looks like the parallelism is rather unbounded. Totally
open to reading this wrong (it is Sunday for me after all!).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]