marin-ma commented on code in PR #11722:
URL: https://github.com/apache/gluten/pull/11722#discussion_r2913163731


##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -562,6 +583,56 @@ void LocalPartitionWriter::init() {
   std::default_random_engine engine(rd());
   std::shuffle(localDirs_.begin(), localDirs_.end(), engine);
   subDirSelection_.assign(localDirs_.size(), 0);
+
+  if (!indexFile_.empty()) {
+    usePartitionMultipleSegments_ = true;
+    partitionSegments_.resize(numPartitions_);
+  }
+}
+
+// Helper for big-endian conversion (network order)
+#include <arpa/inet.h>
+static uint64_t htonll(uint64_t value) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  return (((uint64_t)htonl(value & 0xFFFFFFFFULL)) << 32) | htonl(value >> 32);
+#else
+  return value;
+#endif
+}
+
+arrow::Status LocalPartitionWriter::writeIndexFile() {

Review Comment:
   Can you add some c++ unit tests for the multi-segment partition write?



##########
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala:
##########
@@ -52,6 +55,18 @@ class ColumnarShuffleWriter[K, V](
           s"expected one of: ${HashShuffleWriterType.name}, 
${SortShuffleWriterType.name}")
   }
 
+  shuffleBlockResolver match {
+    case resolver: ColumnarIndexShuffleBlockResolver =>
+      if (resolver.canUseNewFormat() && 
!GlutenConfig.get.columnarShuffleEnableDictionary) {
+        // For Dictionary encoding, the dict only finalizes after all batches 
are processed,
+        // and dict is required to saved at the head of the partition data.
+        // So we cannot use multiple segments to save partition data 
incrementally.
+        partitionUseMultipleSegments = true

Review Comment:
   Please add a configuration to enable this feature.



##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -759,6 +881,12 @@ arrow::Status
 LocalPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t& 
evictBytes) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
+  if (usePartitionMultipleSegments_) {
+    // If multiple segments per partition is enabled, write directly to the 
final data file.
+    RETURN_NOT_OK(writeMemoryPayload(partitionId, std::move(inMemoryPayload)));

Review Comment:
   Can you explain a bit more on how this can reduce the memory usage? Looks 
like the memory is still only get reclaimed by OOM and spilling.



##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -562,6 +583,56 @@ void LocalPartitionWriter::init() {
   std::default_random_engine engine(rd());
   std::shuffle(localDirs_.begin(), localDirs_.end(), engine);
   subDirSelection_.assign(localDirs_.size(), 0);
+
+  if (!indexFile_.empty()) {
+    usePartitionMultipleSegments_ = true;
+    partitionSegments_.resize(numPartitions_);
+  }
+}
+
+// Helper for big-endian conversion (network order)
+#include <arpa/inet.h>

Review Comment:
   Please move the header to the top, and move `htoll` into anonymous namespace 
after the headers.



##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -750,6 +869,9 @@ arrow::Status LocalPartitionWriter::hashEvict(
     for (auto& payload : merged) {
       RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
     }
+    if (usePartitionMultipleSegments_) {
+      RETURN_NOT_OK(flushCachedPayloads());

Review Comment:
   The hashEvict is not only called for spilling. When the evictType is 
`kCache`, then it try to cache as much payload in memory as possible to reduce 
spilling. 
   
   And when the evitType is `kSpill`, the data will be written to a spilled 
data file. Two evict types can exist in the same job. Is `evictType == kSpill` 
being properly handled for multi-segments write? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to