zuston commented on code in PR #1461:
URL:
https://github.com/apache/incubator-uniffle/pull/1461#discussion_r1456945231
##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -124,6 +128,101 @@ private void recordSuccess(ShuffleDataFlushEvent event,
long start) {
}
}
+ /**
+ * The method to handle flush event to flush blocks into persistent storage.
And we will not
+ * change any internal state for event, that means the event is read-only
for this processing.
+ *
+ * <p>Only the blocks are flushed successfully, it can return directly,
otherwise it should always
+ * throw dedicated exception.
+ *
+ * @param event
+ * @throws Exception
+ */
+ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
+ try {
+ ShuffleServerMetrics.gaugeWriteHandler.inc();
+
+ if (!event.isValid()) {
+ LOG.warn(
+ "AppId {} was removed already, event:{} should be dropped",
event.getAppId(), event);
+ // we should catch this to avoid cleaning up duplicate.
+ throw new EventInvalidException();
+ }
+
+ if (reachRetryMax(event)) {
+ LOG.warn("The event:{] has been reached to max retry times, it will be
dropped.", event);
+ throw new EventDiscardException();
+ }
+
+ List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+ if (CollectionUtils.isEmpty(blocks)) {
+ LOG.info("There is no block to be flushed: {}", event);
+ return;
+ }
+
+ Storage storage = event.getUnderStorage();
+ if (storage == null) {
+ LOG.error("Storage selected is null and this should not happen. event:
{}", event);
+ throw new EventDiscardException();
+ }
+
+ if (event.isPended()
+ && System.currentTimeMillis() - event.getStartPendingTime()
+ > pendingEventTimeoutSec * 1000L) {
+ LOG.error(
+ "Flush event cannot be flushed for {} sec, the event {} is
dropped",
+ pendingEventTimeoutSec,
+ event);
+ throw new EventDiscardException();
+ }
+
+ if (!storage.canWrite()) {
+ LOG.error(
+ "The event: {} is limited to flush due to storage:{} can't write",
event, storage);
+ throw new EventRetryException();
+ }
+
+ String user =
+ StringUtils.defaultString(
+
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+ StringUtils.EMPTY);
+ int maxConcurrencyPerPartitionToWrite =
getMaxConcurrencyPerPartitionWrite(event);
+ CreateShuffleWriteHandlerRequest request =
+ new CreateShuffleWriteHandlerRequest(
+ storageType,
+ event.getAppId(),
+ event.getShuffleId(),
+ event.getStartPartition(),
+ event.getEndPartition(),
+ storageBasePaths.toArray(new String[storageBasePaths.size()]),
+ getShuffleServerId(),
+ hadoopConf,
+ storageDataReplica,
+ user,
+ maxConcurrencyPerPartitionToWrite);
+ ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+ boolean writeSuccess = storageManager.write(storage, handler, event);
+ if (!writeSuccess) {
+ throw new EventRetryException();
+ }
+
+ // update some metrics for shuffle task
+ updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
event.getShuffleBlocks());
+ ShuffleTaskInfo shuffleTaskInfo =
+
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+ if (null != shuffleTaskInfo) {
+ String storageHost = event.getUnderStorage().getStorageHost();
+ if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
+ shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
+ } else {
+ shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
+ }
+ }
+ } finally {
+ ShuffleServerMetrics.gaugeWriteHandler.dec();
+ }
+ }
+
public void processEvent(ShuffleDataFlushEvent event) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]