This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push: new 2fda21195 [CELEBORN-2055] Fix some typos 2fda21195 is described below commit 2fda21195985f966dedf779bb4dc8b7f998d1cc8 Author: codenohup <huangxu.wal...@gmail.com> AuthorDate: Thu Jul 10 12:01:02 2025 +0800 [CELEBORN-2055] Fix some typos ### What changes were proposed in this pull request? Inspired by [FLINK-38038](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-38038?filter=allissues]), I used [Tongyi Lingma](https://lingma.aliyun.com/) and qwen3-thinking LLM to identify and fix some typo issues in the Celeborn codebase. For example: - backLog → backlog - won`t → won't - can to be read → can be read - mapDataPartition → mapPartitionData - UserDefinePasswordAuthenticationProviderImpl → UserDefinedPasswordAuthenticationProviderImpl ### Why are the changes needed? Remove typos to improve source code readability for users and ease development for developers. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Code and documentation cleanup does not require additional testing. Closes #3356 from codenohup/fix-typo. Authored-by: codenohup <huangxu.wal...@gmail.com> Signed-off-by: SteNicholas <programg...@163.com> (cherry picked from commit 0fa600ade1ca362ec8bc9156edfc6eac09689a5e) Signed-off-by: SteNicholas <programg...@163.com> --- charts/celeborn/templates/worker/_helpers.tpl | 4 ++-- charts/celeborn/values.yaml | 2 +- .../org/apache/celeborn/cli/TestCelebornCliCommands.scala | 8 ++++---- .../celeborn/plugin/flink/client/CelebornBufferStream.java | 6 +++--- .../plugin/flink/tiered/CelebornChannelBufferReader.java | 4 +--- .../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++-- .../celeborn/plugin/flink/tiered/CelebornTierFactory.java | 4 ++-- .../plugin/flink/tiered/CelebornChannelBufferReader.java | 4 +--- .../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++-- .../celeborn/plugin/flink/tiered/CelebornTierFactory.java | 4 ++-- .../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +- .../java/org/apache/celeborn/client/ShuffleClientImpl.java | 6 +++--- .../scala/org/apache/celeborn/client/LifecycleManager.scala | 4 ++-- .../apache/celeborn/client/RequestLocationCallContext.scala | 2 +- .../org/apache/celeborn/client/commit/CommitHandler.scala | 2 +- .../java/org/apache/celeborn/client/ShuffleClientSuiteJ.java | 8 ++++---- .../celeborn/common/network/server/TransportServer.java | 4 ++-- .../apache/celeborn/common/network/util/TransportConf.java | 2 +- .../apache/celeborn/common/protocol/message/StatusCode.java | 4 ++-- cpp/celeborn/client/reader/WorkerPartitionReader.h | 2 +- .../org/apache/celeborn/service/deploy/master/Master.scala | 8 ++++---- .../common/http/ApiBaseResourceAuthenticationSuite.scala | 12 ++++++------ ...a => UserDefinedPasswordAuthenticationProviderImpl.scala} | 6 +++--- .../celeborn/service/deploy/worker/memory/BufferQueue.java | 2 +- .../celeborn/service/deploy/worker/memory/MemoryManager.java | 8 ++++---- .../service/deploy/worker/storage/CreditStreamManager.java | 8 ++++---- .../service/deploy/worker/storage/MapPartitionData.java | 2 +- .../worker/storage/segment/SegmentMapPartitionData.java | 6 +++--- .../storage/segment/SegmentMapPartitionDataReader.java | 2 +- .../apache/celeborn/service/deploy/worker/Controller.scala | 4 ++-- .../deploy/worker/storage/CreditStreamManagerSuiteJ.java | 4 ++-- 31 files changed, 69 insertions(+), 73 deletions(-) diff --git a/charts/celeborn/templates/worker/_helpers.tpl b/charts/celeborn/templates/worker/_helpers.tpl index c135972ef..8d9e86266 100644 --- a/charts/celeborn/templates/worker/_helpers.tpl +++ b/charts/celeborn/templates/worker/_helpers.tpl @@ -16,7 +16,7 @@ limitations under the License. */}} {{/* -Common labels for Celeborn master resources +Common labels for Celeborn worker resources */}} {{- define "celeborn.worker.labels" -}} {{ include "celeborn.labels" . }} @@ -24,7 +24,7 @@ app.kubernetes.io/role: worker {{- end }} {{/* -Selector labels for Celeborn master pods +Selector labels for Celeborn worker pods */}} {{- define "celeborn.worker.selectorLabels" -}} {{ include "celeborn.selectorLabels" . }} diff --git a/charts/celeborn/values.yaml b/charts/celeborn/values.yaml index 5bf607a88..f77177c4b 100644 --- a/charts/celeborn/values.yaml +++ b/charts/celeborn/values.yaml @@ -35,7 +35,7 @@ image: tag: "" # -- Image pull policy pullPolicy: Always - # -- Image name for init containter. (your-private-repo/alpine:3.18) + # -- Image name for init container. (your-private-repo/alpine:3.18) initContainerImage: alpine:3.18 # -- Image pull secrets for private image registry diff --git a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala index aa8a60151..5422239d1 100644 --- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala +++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala @@ -26,7 +26,7 @@ import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.cli.config.CliConfigManager import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.authentication.HttpAuthSchemes -import org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl} +import org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl} import org.apache.celeborn.service.deploy.MiniClusterFeature import org.apache.celeborn.service.deploy.master.Master import org.apache.celeborn.service.deploy.worker.Worker @@ -38,11 +38,11 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature { .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC")) .set( CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER, - classOf[UserDefinePasswordAuthenticationProviderImpl].getName) + classOf[UserDefinedPasswordAuthenticationProviderImpl].getName) .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC")) .set( CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER, - classOf[UserDefinePasswordAuthenticationProviderImpl].getName) + classOf[UserDefinedPasswordAuthenticationProviderImpl].getName) .set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER)) .set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER)) .set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "DB") @@ -54,7 +54,7 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature { private val BASIC_AUTH_HEADER = HttpAuthSchemes.BASIC + " " + new String( Base64.getEncoder.encode( - s"$CELEBORN_ADMINISTER:${UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()), + s"$CELEBORN_ADMINISTER:${UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()), StandardCharsets.UTF_8) protected var master: Master = _ diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java index b63757d2d..a0a498ff4 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java @@ -193,7 +193,7 @@ public class CelebornBufferStream { } } - private void cleanStream(long streamId) { + private void cleanupStream(long streamId) { if (isOpenSuccess) { mapShuffleClient.getReadClientHandler().removeHandler(streamId); clientFactory.unregisterSupplier(streamId); @@ -204,7 +204,7 @@ public class CelebornBufferStream { public void close() { synchronized (lock) { - cleanStream(streamId); + cleanupStream(streamId); isClosed = true; } } @@ -222,7 +222,7 @@ public class CelebornBufferStream { locations.length); if (currentLocationIndex.get() > 0) { logger.debug("Get end streamId {}", endedStreamId); - cleanStream(endedStreamId); + cleanupStream(endedStreamId); } if (currentLocationIndex.get() < locations.length) { diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java index e245efab9..883d90c89 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java @@ -46,9 +46,7 @@ import org.apache.celeborn.plugin.flink.client.CelebornBufferStream; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData; -/** - * Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle integration strategy now. - */ +/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle integration strategy now. */ public class CelebornChannelBufferReader { private static final Logger LOG = LoggerFactory.getLogger(CelebornChannelBufferReader.class); diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java index 0c5c454ee..7bfd9d163 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java @@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent { /** * partitionId -> subPartitionId -> reader, note that subPartitions may share the same reader, as - * a single reader can consume multiple subPartitions to improvement performance. + * a single reader can consume multiple subPartitions to improve performance. */ private final Map< TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CelebornChannelBufferReader>> @@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent { /** * The notify target is flink inputGate, used in notify input gate which subPartition contain - * shuffle data that can to be read. + * shuffle data that can be read. */ private AvailabilityNotifier availabilityNotifier; diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java index c9913d132..3c821363e 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java @@ -56,7 +56,7 @@ public class CelebornTierFactory implements TierFactory { * The max bytes size of a single segment, it will determine how many buffer can save in a single * segment. */ - private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024; private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName(); @@ -106,7 +106,7 @@ public class CelebornTierFactory implements TierFactory { partitionId, numPartitions, numSubpartitions, - NUM_BYTES_PER_SEGMENT, + MAX_BYTES_PER_SEGMENT, bufferSizeBytes, storageMemoryManager, resourceRegistry, diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java index e245efab9..883d90c89 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java @@ -46,9 +46,7 @@ import org.apache.celeborn.plugin.flink.client.CelebornBufferStream; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData; -/** - * Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle integration strategy now. - */ +/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle integration strategy now. */ public class CelebornChannelBufferReader { private static final Logger LOG = LoggerFactory.getLogger(CelebornChannelBufferReader.class); diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java index 0c5c454ee..7bfd9d163 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java @@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent { /** * partitionId -> subPartitionId -> reader, note that subPartitions may share the same reader, as - * a single reader can consume multiple subPartitions to improvement performance. + * a single reader can consume multiple subPartitions to improve performance. */ private final Map< TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CelebornChannelBufferReader>> @@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent { /** * The notify target is flink inputGate, used in notify input gate which subPartition contain - * shuffle data that can to be read. + * shuffle data that can be read. */ private AvailabilityNotifier availabilityNotifier; diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java index aeaa81831..fbb17f4f7 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java @@ -59,7 +59,7 @@ public class CelebornTierFactory implements TierFactory { * The max bytes size of a single segment, it will determine how many buffer can save in a single * segment. */ - private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024; private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName(); @@ -110,7 +110,7 @@ public class CelebornTierFactory implements TierFactory { partitionId, numPartitions, numSubpartitions, - NUM_BYTES_PER_SEGMENT, + MAX_BYTES_PER_SEGMENT, bufferSizeBytes, storageMemoryManager, resourceRegistry, diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index a3a1add71..bd57cd329 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -118,7 +118,7 @@ public class SparkUtils { field.setAccessible(true); return (SQLMetric) field.get(serializer); } catch (NoSuchFieldException | IllegalAccessException e) { - logger.warn("Failed to get dataSize metric, aqe won`t work properly."); + logger.warn("Failed to get dataSize metric, aqe won't work properly."); } return null; } diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index d9a1e67d6..828eb3158 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -903,7 +903,7 @@ public class ShuffleClientImpl extends ShuffleClient { } else if (StatusCode.STAGE_ENDED.getValue() == statusCode) { stageEndShuffleSet.add(shuffleId); return results; - } else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode) { + } else if (StatusCode.SHUFFLE_UNREGISTERED.getValue() == statusCode) { logger.error("SHUFFLE_NOT_REGISTERED!"); return null; } @@ -1832,7 +1832,7 @@ public class ShuffleClientImpl extends ShuffleClient { response.pushFailedBatches()), null, null); - case SHUFFLE_NOT_REGISTERED: + case SHUFFLE_UNREGISTERED: logger.warn( "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); // return empty result @@ -1844,7 +1844,7 @@ public class ShuffleClientImpl extends ShuffleClient { response.pushFailedBatches()), null, null); - case STAGE_END_TIME_OUT: + case STAGE_END_TIMEOUT: case SHUFFLE_DATA_LOST: exceptionMsg = String.format( diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 9063ce7b8..b339c75e6 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -797,7 +797,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends logError(s"[handleRevive] shuffle $shuffleId not registered!") contextWrapper.reply( -1, - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, None, false) return @@ -871,7 +871,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (!registeredShuffle.contains(shuffleId) && !isSegmentGranularityVisible) { logWarning(s"[handleGetReducerFileGroup] shuffle $shuffleId not registered, maybe no shuffle data within this stage.") context.reply(GetReducerFileGroupResponse( - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, JavaUtils.newConcurrentHashMap(), Array.empty, serdeVersion = serdeVersion)) diff --git a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala index d306546b3..091960a4c 100644 --- a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala +++ b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala @@ -57,7 +57,7 @@ case class ChangeLocationsCallContext( } newLocs.put(partitionId, (status, available, partitionLocationOpt.getOrElse(null))) - if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_NOT_REGISTERED == status + if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_UNREGISTERED == status || StatusCode.STAGE_ENDED == status) { context.reply(ChangeLocationResponse(endedMapIds, newLocs)) } diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index 4a8b542f8..77034d2bd 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -335,7 +335,7 @@ abstract class CommitHandler( status.future.value.get match { case scala.util.Success(res) => res.status match { - case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION => + case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_UNREGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION => if (res.status == StatusCode.SUCCESS) { logDebug(s"Request commitFiles return ${res.status} for " + s"$shuffleKey from worker ${worker.readableAddress()}") diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index 85cf0ba10..d647e60b9 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -483,7 +483,7 @@ public class ShuffleClientSuiteJ { .thenAnswer( t -> { return GetReducerFileGroupResponse$.MODULE$.apply( - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, locations, new int[0], Collections.emptySet(), @@ -496,7 +496,7 @@ public class ShuffleClientSuiteJ { .thenAnswer( t -> { return GetReducerFileGroupResponse$.MODULE$.apply( - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, locations, new int[0], Collections.emptySet(), @@ -519,7 +519,7 @@ public class ShuffleClientSuiteJ { .thenAnswer( t -> { return GetReducerFileGroupResponse$.MODULE$.apply( - StatusCode.STAGE_END_TIME_OUT, + StatusCode.STAGE_END_TIMEOUT, locations, new int[0], Collections.emptySet(), @@ -532,7 +532,7 @@ public class ShuffleClientSuiteJ { .thenAnswer( t -> { return GetReducerFileGroupResponse$.MODULE$.apply( - StatusCode.STAGE_END_TIME_OUT, + StatusCode.STAGE_END_TIMEOUT, locations, new int[0], Collections.emptySet(), diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java index 1808a000c..a8e7e752b 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java @@ -104,8 +104,8 @@ public class TransportServer implements Closeable { .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.ALLOCATOR, allocator); - if (conf.backLog() > 0) { - bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); + if (conf.backlog() > 0) { + bootstrap.option(ChannelOption.SO_BACKLOG, conf.backlog()); } if (conf.receiveBuf() > 0) { diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java index 973064451..55b22306d 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java @@ -70,7 +70,7 @@ public class TransportConf { } /** Requested maximum length of the queue of incoming connections. Default 0 for no backlog. */ - public int backLog() { + public int backlog() { return celebornConf.networkIoBacklog(module); } diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java index 46c59ab52..8987e8d83 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java @@ -31,7 +31,7 @@ public enum StatusCode { // Specific Status SHUFFLE_ALREADY_REGISTERED(3), - SHUFFLE_NOT_REGISTERED(4), + SHUFFLE_UNREGISTERED(4), RESERVE_SLOTS_FAILED(5), SLOT_NOT_AVAILABLE(6), WORKER_NOT_FOUND(7), @@ -54,7 +54,7 @@ public enum StatusCode { HARD_SPLIT(21), SOFT_SPLIT(22), - STAGE_END_TIME_OUT(23), + STAGE_END_TIMEOUT(23), SHUFFLE_DATA_LOST(24), WORKER_SHUTDOWN(25), NO_AVAILABLE_WORKING_DIR(26), diff --git a/cpp/celeborn/client/reader/WorkerPartitionReader.h b/cpp/celeborn/client/reader/WorkerPartitionReader.h index 68fb00230..db22da8c2 100644 --- a/cpp/celeborn/client/reader/WorkerPartitionReader.h +++ b/cpp/celeborn/client/reader/WorkerPartitionReader.h @@ -87,7 +87,7 @@ class WorkerPartitionReader static constexpr auto kDefaultConsumeIter = std::chrono::milliseconds(500); - // TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetry + // TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetries }; } // namespace client } // namespace celeborn diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index a91996986..52d52248a 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -162,7 +162,7 @@ private[celeborn] class Master( logError(msg, ioe) System.exit(1) } else { - logError("Face unexpected IO exception during staring Ratis server", ioe) + logError("Face unexpected IO exception during starting Ratis server", ioe) } } sys @@ -174,7 +174,7 @@ private[celeborn] class Master( // Threads private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder") - private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ + private var checkForWorkerTimeoutTask: ScheduledFuture[_] = _ private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _ private var checkForDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _ @@ -336,7 +336,7 @@ private[celeborn] class Master( "send-application-meta") } - checkForWorkerTimeOutTask = scheduleCheckTask(workerHeartbeatTimeoutMs, pbCheckForWorkerTimeout) + checkForWorkerTimeoutTask = scheduleCheckTask(workerHeartbeatTimeoutMs, pbCheckForWorkerTimeout) checkForApplicationTimeOutTask = scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut) @@ -370,7 +370,7 @@ private[celeborn] class Master( return } logInfo("Stopping Celeborn Master.") - Option(checkForWorkerTimeOutTask).foreach(_.cancel(true)) + Option(checkForWorkerTimeoutTask).foreach(_.cancel(true)) Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true)) Option(checkForApplicationTimeOutTask).foreach(_.cancel(true)) Option(checkForDFSRemnantDirsTimeOutTask).foreach(_.cancel(true)) diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala index 34978b619..7e62a331d 100644 --- a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala @@ -26,7 +26,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.authentication.HttpAuthSchemes import org.apache.celeborn.common.network.TestHelper import org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER -import org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl} +import org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl} abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper { val administers = Seq("celeborn", "celeborn2") @@ -38,14 +38,14 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper { .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", "BEARER")) .set( CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER, - classOf[UserDefinePasswordAuthenticationProviderImpl].getName) + classOf[UserDefinedPasswordAuthenticationProviderImpl].getName) .set( CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER, classOf[UserDefineTokenAuthenticationProviderImpl].getName) .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", "BEARER")) .set( CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER, - classOf[UserDefinePasswordAuthenticationProviderImpl].getName) + classOf[UserDefinedPasswordAuthenticationProviderImpl].getName) .set( CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER, classOf[UserDefineTokenAuthenticationProviderImpl].getName) @@ -67,7 +67,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper { AUTHORIZATION_HEADER, basicAuthorizationHeader( "user", - UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD)) + UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD)) .get() assert(HttpServletResponse.SC_OK == response.getStatus) @@ -126,7 +126,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper { AUTHORIZATION_HEADER, basicAuthorizationHeader( "no_admin", - UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD)) + UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD)) .post(null) assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus) @@ -137,7 +137,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper { AUTHORIZATION_HEADER, basicAuthorizationHeader( admin, - UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD)) + UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD)) .post(null) // pass the admin privilege check, but the api is not found assert(HttpServletResponse.SC_NOT_FOUND == response.getStatus) diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala similarity index 91% rename from service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala rename to service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala index 38db5e49a..0c4c733b5 100644 --- a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinedPasswordAuthenticationProviderImpl.scala @@ -21,10 +21,10 @@ import java.security.Principal import javax.security.sasl.AuthenticationException import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.server.common.http.authentication.UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD +import org.apache.celeborn.server.common.http.authentication.UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD import org.apache.celeborn.spi.authentication.{BasicPrincipal, Credential, PasswdAuthenticationProvider, PasswordCredential} -class UserDefinePasswordAuthenticationProviderImpl +class UserDefinedPasswordAuthenticationProviderImpl extends PasswdAuthenticationProvider with Logging { override def authenticate(credential: PasswordCredential): Principal = { val clientIp = credential.extraInfo.get(Credential.CLIENT_IP_KEY) @@ -37,6 +37,6 @@ class UserDefinePasswordAuthenticationProviderImpl } } -object UserDefinePasswordAuthenticationProviderImpl { +object UserDefinedPasswordAuthenticationProviderImpl { val VALID_PASSWORD = "password" } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java index 10de48a32..e7b49506a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/BufferQueue.java @@ -30,7 +30,7 @@ import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize) +// Assume that max-managed memory for a MapPartitionData is (2^31 * buffersize) public class BufferQueue { public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index ff08c077c..6f4922db0 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -209,10 +209,10 @@ public class MemoryManager { () -> { try { if (creditStreamManager != null) { - int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount(); - if (mapDataPartitionCount > 0) { + int mapPartitionDataCount = creditStreamManager.getActiveMapPartitionCount(); + if (mapPartitionDataCount > 0) { long currentTarget = - (long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount); + (long) Math.ceil(readBufferTarget * 1.0 / mapPartitionDataCount); if (Math.abs(lastNotifiedTarget - currentTarget) > readBufferTargetNotifyThreshold) { synchronized (readBufferTargetChangeListeners) { @@ -220,7 +220,7 @@ public class MemoryManager { "read buffer target changed {} -> {} active map partition count {}", lastNotifiedTarget, currentTarget, - mapDataPartitionCount); + mapPartitionDataCount); for (ReadBufferTargetChangeListener changeListener : readBufferTargetChangeListeners) { changeListener.onChange(currentTarget); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java index 0a3736a38..e8f408afe 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java @@ -200,7 +200,7 @@ public class CreditStreamManager { logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId, numCredit); return; } - MapPartitionData mapPartitionData = streams.get(streamId).getMapDataPartition(); + MapPartitionData mapPartitionData = streams.get(streamId).getMapPartitionData(); addCredit(mapPartitionData, numCredit, streamId); } @@ -208,7 +208,7 @@ public class CreditStreamManager { StreamState streamState = streams.get(streamId); if (streamState != null) { notifyRequiredSegment( - streamState.getMapDataPartition(), requiredSegmentId, streamId, subPartitionId); + streamState.getMapPartitionData(), requiredSegmentId, streamId, subPartitionId); } else { // In flink hybrid shuffle integration strategy, the stream may release in worker before // client receive bufferStreamEnd, @@ -279,7 +279,7 @@ public class CreditStreamManager { public void cleanResource(Long streamId) { logger.debug("received clean stream: {}", streamId); if (streams.containsKey(streamId)) { - MapPartitionData mapPartitionData = streams.get(streamId).getMapDataPartition(); + MapPartitionData mapPartitionData = streams.get(streamId).getMapPartitionData(); if (mapPartitionData != null) { if (mapPartitionData.releaseReader(streamId)) { streams.remove(streamId); @@ -340,7 +340,7 @@ public class CreditStreamManager { return bufferSize; } - public MapPartitionData getMapDataPartition() { + public MapPartitionData getMapPartitionData() { return mapPartitionData; } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java index 46577e049..d88370edf 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java @@ -260,7 +260,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis @Override public String toString() { - return "MapDataPartition{" + "fileInfo=" + diskFileInfo.getFilePath() + '}'; + return "MapPartitionData{" + "fileInfo=" + diskFileInfo.getFilePath() + '}'; } public ConcurrentHashMap<Long, MapPartitionDataReader> getReaders() { diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java index aa3090a3d..e8d467d8f 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java @@ -57,7 +57,7 @@ public class SegmentMapPartitionData extends MapPartitionData { @Override public void setupDataPartitionReader( int startSubIndex, int endSubIndex, long streamId, Channel channel) { - SegmentMapPartitionDataReader mapDataPartitionReader = + SegmentMapPartitionDataReader mapPartitionDataReader = new SegmentMapPartitionDataReader( startSubIndex, endSubIndex, @@ -70,7 +70,7 @@ public class SegmentMapPartitionData extends MapPartitionData { startSubIndex, endSubIndex, streamId); - readers.put(streamId, mapDataPartitionReader); + readers.put(streamId, mapPartitionDataReader); } @Override @@ -85,7 +85,7 @@ public class SegmentMapPartitionData extends MapPartitionData { @Override public String toString() { - return String.format("SegmentMapDataPartition{filePath=%s}", diskFileInfo.getFilePath()); + return String.format("SegmentMapPartitionData{filePath=%s}", diskFileInfo.getFilePath()); } public void notifyRequiredSegmentId(int segmentId, long streamId, int subPartitionId) { diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java index e65476739..522036b6a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionDataReader.java @@ -211,7 +211,7 @@ public class SegmentMapPartitionDataReader extends MapPartitionDataReader { @Override public String toString() { - final StringBuilder sb = new StringBuilder("SegmentMapDataPartitionReader{"); + final StringBuilder sb = new StringBuilder("SegmentMapPartitionDataReader{"); sb.append("startPartitionIndex=").append(startPartitionIndex); sb.append(", endPartitionIndex=").append(endPartitionIndex); sb.append(", streamId=").append(streamId); diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 64624e9ad..f01792729 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -426,7 +426,7 @@ private[deploy] class Controller( logError(s"Shuffle $shuffleKey doesn't exist!") context.reply( CommitFilesResponse( - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, List.empty.asJava, List.empty.asJava, primaryIds, @@ -681,7 +681,7 @@ private[deploy] class Controller( logWarning(s"Shuffle $shuffleKey not registered!") context.reply( DestroyWorkerSlotsResponse( - StatusCode.SHUFFLE_NOT_REGISTERED, + StatusCode.SHUFFLE_UNREGISTERED, primaryLocations, replicaLocations)) return diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java index d886cc864..15e28c856 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java @@ -102,9 +102,9 @@ public class CreditStreamManagerSuiteJ { streamIdConsumer, channel, shuffleKey, 0, 1, 1, diskFileInfo); MapPartitionData mapPartitionData1 = - creditStreamManager.getStreams().get(registerStream1).getMapDataPartition(); + creditStreamManager.getStreams().get(registerStream1).getMapPartitionData(); MapPartitionData mapPartitionData2 = - creditStreamManager.getStreams().get(registerStream2).getMapDataPartition(); + creditStreamManager.getStreams().get(registerStream2).getMapPartitionData(); Assert.assertEquals(mapPartitionData1, mapPartitionData2); mapPartitionData1.getStreamReader(registerStream1).recycle();