This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 0e35a85be [CELEBORN-810] Fix some typos and grammar
0e35a85be is described below
commit 0e35a85be9d12e7b61a07862f18259d3b690f5f4
Author: onebox-li <[email protected]>
AuthorDate: Wed Jul 19 18:35:38 2023 +0800
[CELEBORN-810] Fix some typos and grammar
### What changes were proposed in this pull request?
Fix some typos and grammar
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manually test
Closes #1733 from onebox-li/fix-typo.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 405b2801fa96c18c987d26bf16a777041da0410e)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/plugin/flink/buffer/SortBuffer.java | 2 +-
.../spark/shuffle/celeborn/HashBasedShuffleWriter.java | 8 ++++----
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 2 +-
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../scala/org/apache/celeborn/client/CommitManager.scala | 2 +-
.../java/org/apache/celeborn/common/client/MasterClient.java | 2 +-
.../org/apache/celeborn/common/network/TransportContext.java | 6 +++---
.../apache/celeborn/common/network/buffer/ManagedBuffer.java | 2 +-
.../common/network/client/TransportClientFactory.java | 2 +-
.../celeborn/common/network/protocol/ChunkFetchSuccess.java | 4 ++--
.../celeborn/common/network/protocol/MessageWithHeader.java | 2 +-
.../common/network/server/TransportChannelHandler.java | 2 +-
.../org/apache/celeborn/common/util/ShutdownHookManager.java | 2 +-
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++------
.../celeborn/common/protocol/PartitionLocationSuiteJ.java | 2 +-
docs/celeborn_ratis_shell.md | 6 +++---
docs/configuration/client.md | 6 +++---
docs/configuration/index.md | 2 +-
docs/configuration/quota.md | 2 +-
docs/monitoring.md | 2 +-
.../service/deploy/master/clustermeta/ha/HARaftServer.java | 2 +-
.../service/deploy/master/clustermeta/ha/StateMachine.java | 2 +-
.../org/apache/celeborn/tests/flink/HeartbeatTest.scala | 6 +++---
.../deploy/worker/storage/MapDataPartitionReader.java | 2 +-
.../celeborn/service/deploy/worker/PushDataHandler.scala | 12 ++++++------
25 files changed, 47 insertions(+), 47 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
index 19f979992..7318d6bb1 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
@@ -55,7 +55,7 @@ public interface SortBuffer {
/** Returns true if there is still data can be consumed in this {@link
SortBuffer}. */
boolean hasRemaining();
- /** Finishes this {@link SortBuffer} which means no record can be appended
any more. */
+ /** Finishes this {@link SortBuffer} which means no record can be appended
anymore. */
void finish();
/** Whether this {@link SortBuffer} is finished or not. */
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 0684c72f8..dee8d897a 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -96,7 +96,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
/**
* Are we in the process of stopping? Because map tasks can call stop() with
success = true and
* then call stop() with success = false if they get an exception, we want
to make sure we don't
- * try deleting files, etc twice.
+ * try deleting files, etc. twice.
*/
private volatile boolean stopping = false;
@@ -393,9 +393,9 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void closeColumnarWrite() throws IOException {
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer)
dep.serializer());
for (int i = 0; i < numPartitions; i++) {
- final CelebornBatchBuilder buidlers = celebornBatchBuilders[i];
- if (buidlers != null && buidlers.getRowCnt() > 0) {
- byte[] buffers = buidlers.buildColumnBytes();
+ final CelebornBatchBuilder builders = celebornBatchBuilders[i];
+ if (builders != null && builders.getRowCnt() > 0) {
+ byte[] buffers = builders.buildColumnBytes();
if (dataSize != null) {
dataSize.add(buffers.length);
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 3766c623d..98c694306 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -85,7 +85,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
/**
* Are we in the process of stopping? Because map tasks can call stop() with
success = true and
* then call stop() with success = false if they get an exception, we want
to make sure we don't
- * try deleting files, etc twice.
+ * try deleting files, etc. twice.
*/
private volatile boolean stopping = false;
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index dada674cc..00e95898e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -605,7 +605,7 @@ public class ShuffleClientImpl extends ShuffleClient {
* @param shuffleMap
* @param partitionId
* @param epoch
- * @param wait wheter to wait for some time for a newer PartitionLocation
+ * @param wait whether to wait for some time for a newer PartitionLocation
* @return
*/
boolean newerPartitionLocationExists(
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 316e9ba70..f211f9ba6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -54,7 +54,7 @@ case class ShuffleCommittedInfo(
committedPrimaryStorageInfos: ConcurrentHashMap[String, StorageInfo],
// unique partition id -> storage info
committedReplicaStorageInfos: ConcurrentHashMap[String, StorageInfo],
- // unique partition id -> mapid bitmat
+ // unique partition id -> mapId bitmap
committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
// number of partition files
currentShuffleFileCount: LongAdder,
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index f49ae6df0..559e409da 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -139,7 +139,7 @@ public class MasterClient {
LOG.debug("Send rpc message " + message);
RpcEndpointRef endpointRef = null;
// Use AtomicInteger or Integer or any Object which holds an int value is
ok, we just need to
- // transfer a object to get the change of the current index of master
addresses.
+ // transfer an object to get the change of the current index of master
addresses.
AtomicInteger currentMasterIdx = new AtomicInteger(0);
long sleepLimitTime = 2000; // 2s
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index c10b4e65b..761304377 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -40,9 +40,9 @@ import
org.apache.celeborn.common.network.util.TransportFrameDecoder;
* setup Netty Channel pipelines with a {@link TransportChannelHandler}.
*
* <p>There are two communication protocols that the TransportClient provides,
control-plane RPCs
- * and data-plane "chunk fetching". The handling of the RPCs is performed
outside of the scope of
- * the TransportContext (i.e., by a user-provided handler), and it is
responsible for setting up
- * streams which can be streamed through the data plane in chunks using
zero-copy IO.
+ * and data-plane "chunk fetching". The handling of the RPCs is performed
outside the scope of the
+ * TransportContext (i.e., by a user-provided handler), and it is responsible
for setting up streams
+ * which can be streamed through the data plane in chunks using zero-copy IO.
*
* <p>The TransportServer and TransportClientFactory both create a
TransportChannelHandler for each
* channel. As each TransportChannelHandler contains a TransportClient, this
enables server
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java
b/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java
index 64d5d1a81..ce320d9d7 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java
@@ -64,7 +64,7 @@ public abstract class ManagedBuffer {
public abstract ManagedBuffer release();
/**
- * Convert the buffer into an Netty object, used to write the data out. The
return value is either
+ * Convert the buffer into a Netty object, used to write the data out. The
return value is either
* a {@link io.netty.buffer.ByteBuf} or a {@link
io.netty.channel.FileRegion}.
*
* <p>If this method returns a ByteBuf, then that buffer's reference count
will be incremented and
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index bd01560c0..e4b12fb96 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -120,7 +120,7 @@ public class TransportClientFactory implements Closeable {
throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
- // Use unresolved address here to avoid DNS resolution each time we
creates a client.
+ // Use unresolved address here to avoid DNS resolution each time we create
a client.
final InetSocketAddress unresolvedAddress =
InetSocketAddress.createUnresolved(remoteHost, remotePort);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java
index 21a3581f8..baa663ce6 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java
@@ -26,8 +26,8 @@ import
org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
/**
* Response to {@link ChunkFetchRequest} when a chunk exists and has been
successfully fetched.
*
- * <p>Note that the server-side encoding of this messages does NOT include the
buffer itself, as
- * this may be written by Netty in a more efficient manner (i.e., zero-copy
write). Similarly, the
+ * <p>Note that the server-side encoding of this message does NOT include the
buffer itself, as this
+ * may be written by Netty in a more efficient manner (i.e., zero-copy write).
Similarly, the
* client-side decoding will reuse the Netty ByteBuf as the buffer.
*/
public final class ChunkFetchSuccess extends ResponseMessage {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java
index 3476d2714..21f11d49f 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java
@@ -138,7 +138,7 @@ class MessageWithHeader extends AbstractFileRegion {
// SPARK-24578: cap the sub-region's size of returned nio buffer to
improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
- // If the ByteBuf holds more then one ByteBuffer we should better call
nioBuffers(...)
+ // If the ByteBuf holds more than one ByteBuffer we should better call
nioBuffers(...)
// to eliminate extra memory copies.
int written = 0;
if (buf.nioBufferCount() == 1) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
index b7c119afd..f0eae1a7e 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
@@ -175,7 +175,7 @@ public class TransportChannelHandler extends
ChannelInboundHandlerAdapter {
requestTimeoutNs / 1000 / 1000);
}
if (closeIdleConnections) {
- // While CloseIdleConnections is enable, we also close idle
connection
+ // While CloseIdleConnections is enabled, we also close idle
connection
client.timeOut();
ctx.close();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
index 0afb63906..be535cafe 100644
---
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
+++
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
@@ -296,7 +296,7 @@ public final class ShutdownHookManager {
* Indicates if a shutdownHook is registered or not.
*
* @param shutdownHook shutdownHook to check if registered.
- * @return TRUE/FALSE depending if the shutdownHook is is registered.
+ * @return TRUE/FALSE depending if the shutdownHook is registered.
*/
public boolean hasShutdownHook(Runnable shutdownHook) {
return hooks.contains(new HookEntry(shutdownHook, 0));
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 212dddd0d..66ee2fea0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -600,7 +600,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT)
def haMasterRatisLogPurgeGap: Int = get(HA_MASTER_RATIS_LOG_PURGE_GAP)
def haMasterRatisLogInstallSnapshotEnabled: Boolean =
- get(HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED)
+ get(HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED)
def haMasterRatisRpcRequestTimeout: Long =
get(HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT)
def haMasterRatisRetryCacheExpiryTime: Long =
get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
@@ -1672,7 +1672,7 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("32MB")
- val HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
+ val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
.internal
@@ -3191,7 +3191,7 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.push.stageEnd.timeout")
.categories("client")
.doc(s"Timeout for waiting StageEnd. " +
- s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files" +
+ s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files " +
s"and 1 times for releasing slots request. User can customize this
value according to your setting. " +
s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.version("0.3.0")
@@ -3230,7 +3230,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.client.rpc.requestPartition.askTimeout")
.categories("client")
.version("0.2.0")
- .doc(s"Timeout for ask operations during requesting change partition
location, such as reviving or spliting partition. " +
+ .doc(s"Timeout for ask operations during requesting change partition
location, such as reviving or splitting partition. " +
s"During this process, there are
`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for
reserving slots. " +
s"User can customize this value according to your setting. " +
s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
@@ -3241,7 +3241,7 @@ object CelebornConf extends Logging {
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during getting reducer file group
information. " +
- s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files" +
+ s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files " +
s"and 1 times for releasing slots request. User can customize this
value according to your setting. " +
s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
@@ -3502,7 +3502,7 @@ object CelebornConf extends Logging {
s"`${classOf[DefaultIdentityProvider].getName}`. " +
s"Optional values: " +
s"org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user
name will be obtained by UserGroupInformation.getUserName; " +
- s"org.apache.celeborn.common.identity.DefaultIdentityProvider uesr
name and tenant id are default values or user-specific values.")
+ s"org.apache.celeborn.common.identity.DefaultIdentityProvider user
name and tenant id are default values or user-specific values.")
.version("0.2.0")
.stringConf
.createWithDefault(classOf[DefaultIdentityProvider].getName)
diff --git
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
index ca87af68e..927b3e9c1 100644
---
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
@@ -51,7 +51,7 @@ public class PartitionLocationSuiteJ {
for (int i = 2; i < 255; ++i) {
byte otherMode = (byte) i;
- // Should we return replica mode when the parameter passed in is neither
0 or 1?
+ // Should we return replica mode when the parameter passed in is neither
0 nor 1?
assert PartitionLocation.getMode(otherMode) ==
PartitionLocation.Mode.REPLICA;
}
}
diff --git a/docs/celeborn_ratis_shell.md b/docs/celeborn_ratis_shell.md
index 841560849..b7086bb90 100644
--- a/docs/celeborn_ratis_shell.md
+++ b/docs/celeborn_ratis_shell.md
@@ -35,8 +35,8 @@ then it's convenient for Celeborn Admin to operate the master
ratis service.
Celeborn directly introduces the ratis-shell into the project, users don't
need to set up ratis-shell env from ratis repo.
User can directly download the Celeborn source tarball from
[Download](https://celeborn.apache.org/download) and
-build the Celeborn accoriding to
[build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/)
-or just down load the pre-built binary tarball from
[Download](https://celeborn.apache.org/download)
+build the Celeborn according to
[build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/)
+or just download the pre-built binary tarball from
[Download](https://celeborn.apache.org/download)
to get the binary package `apache-celeborn-<VERSION>-bin.tgz`.
After getting the binary package `apache-celeborn-<VERSION>-bin.tgz`:
@@ -64,7 +64,7 @@ Usage: celeborn-ratis sh [generic options]
## generic options
The `generic options` pass values for a given ratis-shell property.
-It support the following content:
+It supports the following content:
`-D*`, `-X*`, `-agentlib*`, `-javaagent*`
```
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 94f5771c8..a91a44df1 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -55,7 +55,7 @@ license: |
| celeborn.client.push.slowStart.maxSleepTime | 2s | If
celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a
sleep strategy for each batch of requests, this controls the max sleep time if
the max in flight requests limit is 1 for a long time | 0.3.0 |
| celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to
randomize partitionId in push sorter. If true, partitionId will be randomized
when sort data to avoid skew when push to worker | 0.3.0 |
| celeborn.client.push.splitPartition.threads | 8 | Thread number to process
shuffle split request in shuffle client. | 0.3.0 |
-| celeborn.client.push.stageEnd.timeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for waiting
StageEnd. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing filesand 1 times for releasing slots request. User can customize
this value according to your setting. By default, the value is the max timeout
value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
+| celeborn.client.push.stageEnd.timeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for waiting
StageEnd. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing files and 1 times for releasing slots request. User can
customize this value according to your setting. By default, the value is the
max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task
available to push to worker. | 0.3.0 |
| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task
available to push to worker. | 0.3.0 |
| celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc
message. This value should better be more than twice of
`celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
@@ -68,10 +68,10 @@ license: |
| celeborn.client.rpc.cache.concurrencyLevel | 32 | The number of write locks
to update rpc cache. | 0.3.0 |
| celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is
removed. | 0.3.0 |
| celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc
cache. | 0.3.0 |
-| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during getting reducer file group information. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing filesand 1 times for releasing slots request. User can customize
this value according to your setting. By default, the value is the max timeout
value `celeborn.<module>.io.con [...]
+| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during getting reducer file group information. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing files and 1 times for releasing slots request. User can
customize this value according to your setting. By default, the value is the
max timeout value `celeborn.<module>.io.co [...]
| celeborn.client.rpc.maxParallelism | 1024 | Max parallelism of client on
sending RPC requests. | 0.3.0 |
| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during register shuffle. During this process, there are two times for retry
opportunities for requesting slots, one request for establishing a connection
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry
opportunities for reserving slots. User can customize this value according to
your setting. By default, the value is the m [...]
-| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during requesting change partition location, such as reviving or spliting
partition. During this process, there are
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for
reserving slots. User can customize this value according to your setting. By
default, the value is the max timeout value
`celeborn.<module>.io.connectionTime [...]
+| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during requesting change partition location, such as reviving or splitting
partition. During this process, there are
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for
reserving slots. User can customize this value according to your setting. By
default, the value is the max timeout value `celeborn.<module>.io.connectionTim
[...]
| celeborn.client.rpc.reserveSlots.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for LifecycleManager request reserve
slots. | 0.3.0 |
| celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms |
Interval for LifecycleManager to schedule handling change partition requests in
batch. | 0.3.0 |
| celeborn.client.shuffle.batchHandleChangePartition.threads | 8 | Threads
number for LifecycleManager to handle change partition request in batch. |
0.3.0 |
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index d1a64cadc..2ee39c40a 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -36,7 +36,7 @@ bufferSize = `celeborn.worker.flusher.buffer.size` # the
amount of memory will
off-heap-memory = bufferSize * estimatedTasks * 2 + network memory
```
-For example, if an Celeborn worker has 10 storage directories or disks and the
buffer size is set to 256 KiB.
+For example, if a Celeborn worker has 10 storage directories or disks and the
buffer size is set to 256 KiB.
The necessary off-heap memory is 10 GiB.
Network memory will be consumed when netty reads from a TPC channel, there
will need some extra
diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md
index 9892db75c..5537de934 100644
--- a/docs/configuration/quota.md
+++ b/docs/configuration/quota.md
@@ -21,7 +21,7 @@ license: |
| --- | ------- | ----------- | ----- |
| celeborn.quota.configuration.path | <undefined> | Quota configuration
file path. The file format should be yaml. Quota configuration file template
can be found under conf directory. | 0.2.0 |
| celeborn.quota.enabled | true | When true, before registering shuffle,
LifecycleManager should check if current user have enough quota space, if
cluster don't have enough quota space for current user, fallback to Spark's
default shuffle | 0.2.0 |
-| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider
class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider uesr name and
tenant id are default values or user-specific values. | 0.2.0 |
+| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider
class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0.2.0 |
| celeborn.quota.identity.user-specific.tenant | default | Tenant id if
celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |
| celeborn.quota.identity.user-specific.userName | default | User name if
celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |
| celeborn.quota.manager |
org.apache.celeborn.common.quota.DefaultQuotaManager | QuotaManger class name.
Default class is `org.apache.celeborn.common.quota.DefaultQuotaManager`. |
0.2.0 |
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 92db7c922..b2ff46162 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -115,7 +115,7 @@ These metrics are exposed by Celeborn master.
- namespace=ResourceConsumption
- **notes:**
- - This merics data is generated for each user and they are identified
using a metric tag.
+ - This metrics data is generated for each user and they are identified
using a metric tag.
- diskFileCount
- diskBytesWritten
- hdfsFileCount
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index f20cfff16..73149bc01 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -94,7 +94,7 @@ public class HARaftServer {
private long appTimeoutDeadline;
/**
- * Returns an Master Ratis server.
+ * Returns a Master Ratis server.
*
* @param conf configuration
* @param localRaftPeerId raft peer id of this Ratis server
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 7a4a2f1ab..5b3582f61 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -328,7 +328,7 @@ public class StateMachine extends BaseStateMachine {
}
/**
- * Store the current state as an snapshot file in the stateMachineStorage.
+ * Store the current state as a snapshot file in the stateMachineStorage.
*
* @return the index of the snapshot
*/
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
index 3549e72d5..625debb4c 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
@@ -29,7 +29,7 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature
with HeartbeatFeature
with BeforeAndAfterAll with BeforeAndAfterEach {
- test("celeborn flink hearbeat test - client <- worker") {
+ test("celeborn flink heartbeat test - client <- worker") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
@@ -44,7 +44,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
testHeartbeatFromWorker2Client(flinkShuffleClientImpl.getDataClientFactory)
}
- test("celeborn flink hearbeat test - client <- worker no heartbeat") {
+ test("celeborn flink heartbeat test - client <- worker no heartbeat") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
@@ -59,7 +59,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
testHeartbeatFromWorker2ClientWithNoHeartbeat(flinkShuffleClientImpl.getDataClientFactory)
}
- test("celeborn flink hearbeat test - client <- worker timeout") {
+ test("celeborn flink heartbeat test - client <- worker timeout") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index abfd2cb39..d9fbdb5a3 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -254,7 +254,7 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer,
int headerSize)
throws IOException {
readHeaderOrIndexBuffer(channel, header, headerSize);
- // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total
Compresszed Length(4)
+ // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total
Compressed Length(4)
// we need size here,so we read length directly
int bufferLength = header.getInt(12);
if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index fe125cd41..bb54a2f8c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -175,9 +175,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (location == null) {
val (mapId, attemptId) = getMapAttempt(body)
// MapperAttempts for a shuffle exists after any CommitFiles request
succeeds.
- // A shuffle can trigger multiple CommitFiles requests, for reasons
like: Hard-Split happens, StageEnd.
+ // A shuffle can trigger multiple CommitFiles requests, for reasons
like: HARD_SPLIT happens, StageEnd.
// If MapperAttempts but the value is -1 for the mapId(-1 means the map
has not yet finished),
- // it's probably because commitFiles for Had-Split happens.
+ // it's probably because commitFiles for HARD_SPLIT happens.
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
// partition data has already been committed
@@ -195,7 +195,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
// If there is no shuffle key in shuffleMapperAttempts but there is
shuffle key
// in StorageManager. This partition should be HARD_SPLIT partition
and
- // after worker restart, some task still push data to this
HARD_SPLIT partition.
+ // after worker restart, some tasks still push data to this
HARD_SPLIT partition.
logInfo(s"[Case2] Receive push data for committed hard split
partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
@@ -437,9 +437,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (loc == null) {
val (mapId, attemptId) = getMapAttempt(body)
// MapperAttempts for a shuffle exists after any CommitFiles request
succeeds.
- // A shuffle can trigger multiple CommitFiles requests, for reasons
like: Hard-Split happens, StageEnd.
+ // A shuffle can trigger multiple CommitFiles requests, for reasons
like: HARD_SPLIT happens, StageEnd.
// If MapperAttempts but the value is -1 for the mapId(-1 means the
map has not yet finished),
- // it's probably because commitFiles for Had-Split happens.
+ // it's probably because commitFiles for HARD_SPLIT happens.
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
logInfo(s"Receive push merged data from speculative " +
@@ -457,7 +457,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
// If there is no shuffle key in shuffleMapperAttempts but there
is shuffle key
// in StorageManager. This partition should be HARD_SPLIT
partition and
- // after worker restart, some task still push data to this
HARD_SPLIT partition.
+ // after worker restart, some tasks still push data to this
HARD_SPLIT partition.
logInfo(s"Receive push merged data for committed hard split
partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(