maobaolong commented on code in PR #1566:
URL:
https://github.com/apache/incubator-uniffle/pull/1566#discussion_r1732687943
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -50,6 +56,213 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
private Method unregisterAllMapOutputMethod;
private Method registerShuffleMethod;
+ /** See static overload of this method. */
+ public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf
rssConf);
+
+ /**
+ * Derives block id layout config from maximum number of allowed partitions.
This value can be set
+ * in either SparkConf or RssConf via RssSparkConfig.RSS_MAX_PARTITIONS,
where SparkConf has
+ * precedence.
+ *
+ * <p>Computes the number of required bits for partition id and task attempt
id and reserves
+ * remaining bits for sequence number. Adds
RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+ * RssClientConf.BLOCKID_PARTITION_ID_BITS, and
RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS to the
+ * given RssConf and adds them prefixed with "spark." to the given SparkConf.
+ *
+ * <p>If RssSparkConfig.RSS_MAX_PARTITIONS is not set, given values for
+ * RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
RssClientConf.BLOCKID_PARTITION_ID_BITS, and
+ * RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS are copied
+ *
+ * <p>Then, BlockIdLayout can consistently be created from both configs:
+ *
+ * <p>BlockIdLayout.from(rssConf)
BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf))
+ *
+ * @param sparkConf Spark config providing max partitions
+ * @param rssConf Rss config to amend
+ * @param maxFailures Spark max failures
+ * @param speculation Spark speculative execution
+ */
+ @VisibleForTesting
+ protected static void configureBlockIdLayout(
+ SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean
speculation) {
+ if (sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key())) {
+ configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures,
speculation);
+ } else {
+ configureBlockIdLayoutFromLayoutConfig(sparkConf, rssConf, maxFailures,
speculation);
+ }
+ }
+
+ private static void configureBlockIdLayoutFromMaxPartitions(
+ SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean
speculation) {
+ int maxPartitions =
+ sparkConf.getInt(
+ RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+ RssSparkConfig.RSS_MAX_PARTITIONS.defaultValue().get());
+ if (maxPartitions <= 1) {
+ throw new IllegalArgumentException(
+ "Value of "
+ + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+ + " must be larger than 1: "
+ + maxPartitions);
+ }
+
+ int attemptIdBits = getAttemptIdBits(getMaxAttemptNo(maxFailures,
speculation));
+ int partitionIdBits = 32 - Integer.numberOfLeadingZeros(maxPartitions -
1); // [1..31]
+ int taskAttemptIdBits = partitionIdBits + attemptIdBits; //
[1+attemptIdBits..31+attemptIdBits]
Review Comment:
Hi @EnricoMi Could you please introduce why `taskAttemptIdBits` equals
`partitionIdBits + attemptIdBits`? And why `partitionIdBits = 32 -
Integer.numberOfLeadingZeros(maxPartitions - 1);` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]