advancedxy commented on code in PR #744:
URL: https://github.com/apache/incubator-uniffle/pull/744#discussion_r1142861169
##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -80,13 +89,13 @@ public void write(List<ShufflePartitionedBlock>
shuffleBlocks) throws Exception
if (queue.isEmpty()) {
LOGGER.warn("No free hdfs writer handler, it will wait. storage path:
{}", basePath);
}
- HdfsShuffleWriteHandler writeHandler = queue.take();
+ ShuffleWriteHandler writeHandler = queue.take();
try {
writeHandler.write(shuffleBlocks);
} finally {
- // Use add() here because we are sure the capacity will not be exceeded.
- // Note: add() throws IllegalStateException when queue is full.
- queue.add(writeHandler);
+ // Use addFirst() here because we are sure the capacity will not be
exceeded.
Review Comment:
I think you didn't address @jerqi's comment. A javadoc for this class is
good.
But it still doesn't explain why we are using addFirst here..
And I believe the comment here is not up to date since it was for `add()`
method.
##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java:
##########
@@ -29,13 +30,29 @@
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+/**
+ * The {@link PooledHdfsShuffleWriteHandler} is a wrapper of underlying
multiple
+ * {@link HdfsShuffleWriteHandler} to support concurrency control of writing
single
+ * partition to multi files.
+ *
+ * By leveraging {@link LinkedBlockingDeque}, it will always write the same
file when
+ * no race condition, which is good for reducing file numbers for HDFS.
+ */
public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
- private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+ private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
private final int maxConcurrency;
private final String basePath;
+ // Only for tests
+ @VisibleForTesting
+ public
PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> queue) {
+ this.queue = queue;
+ this.maxConcurrency = queue.size();
Review Comment:
nit: extra space.
```suggestion
this.maxConcurrency = queue.size();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]