EnricoMi commented on code in PR #1566:
URL:
https://github.com/apache/incubator-uniffle/pull/1566#discussion_r1735965051
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -50,6 +56,213 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
private Method unregisterAllMapOutputMethod;
private Method registerShuffleMethod;
+ /** See static overload of this method. */
+ public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf
rssConf);
+
+ /**
+ * Derives block id layout config from maximum number of allowed partitions.
This value can be set
+ * in either SparkConf or RssConf via RssSparkConfig.RSS_MAX_PARTITIONS,
where SparkConf has
+ * precedence.
+ *
+ * <p>Computes the number of required bits for partition id and task attempt
id and reserves
+ * remaining bits for sequence number. Adds
RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+ * RssClientConf.BLOCKID_PARTITION_ID_BITS, and
RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS to the
+ * given RssConf and adds them prefixed with "spark." to the given SparkConf.
+ *
+ * <p>If RssSparkConfig.RSS_MAX_PARTITIONS is not set, given values for
+ * RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
RssClientConf.BLOCKID_PARTITION_ID_BITS, and
+ * RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS are copied
+ *
+ * <p>Then, BlockIdLayout can consistently be created from both configs:
+ *
+ * <p>BlockIdLayout.from(rssConf)
BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf))
+ *
+ * @param sparkConf Spark config providing max partitions
+ * @param rssConf Rss config to amend
+ * @param maxFailures Spark max failures
+ * @param speculation Spark speculative execution
+ */
+ @VisibleForTesting
+ protected static void configureBlockIdLayout(
+ SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean
speculation) {
+ if (sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key())) {
+ configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures,
speculation);
+ } else {
+ configureBlockIdLayoutFromLayoutConfig(sparkConf, rssConf, maxFailures,
speculation);
+ }
+ }
+
+ private static void configureBlockIdLayoutFromMaxPartitions(
+ SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean
speculation) {
+ int maxPartitions =
+ sparkConf.getInt(
+ RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+ RssSparkConfig.RSS_MAX_PARTITIONS.defaultValue().get());
+ if (maxPartitions <= 1) {
+ throw new IllegalArgumentException(
+ "Value of "
+ + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+ + " must be larger than 1: "
+ + maxPartitions);
+ }
+
+ int attemptIdBits = getAttemptIdBits(getMaxAttemptNo(maxFailures,
speculation));
+ int partitionIdBits = 32 - Integer.numberOfLeadingZeros(maxPartitions -
1); // [1..31]
+ int taskAttemptIdBits = partitionIdBits + attemptIdBits; //
[1+attemptIdBits..31+attemptIdBits]
Review Comment:
The `taskAttemptId` consists of the `partitionId` and the `attemptId`. Each
partition is attempted at most `maxFailures` times (plus one more attempt if
speculation is enabled). These attempts have attempt ids `[0, 1, ...
maxFailures - 1]`. This way we can produce a unique `taskAttemptId` that
requires less bits than the Spark task attempt id, which is monotonically
increasing across all stages. That requires much more bits and causes task
attempt id overflows.
The expression `32 - Integer.numberOfLeadingZeros(maxPartitions - 1)`
computes the number of bits required to encode partition ids that are upper
bound by `maxPartitions`. With `maxPartitions = 8`, this should evaluate to `3`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]