This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new c7300f3766 [SYSTEMDS-3934] Improved partition handling of OOC eviction
manager
c7300f3766 is described below
commit c7300f3766be01b0df9fef2ac99eaa2c41d5110e
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Sun Nov 9 09:37:33 2025 +0100
[SYSTEMDS-3934] Improved partition handling of OOC eviction manager
Closes #2350.
---
.../instructions/ooc/OOCEvictionManager.java | 376 +++++++++++++++++----
.../sysds/runtime/util/PartitionFileManager.java | 27 ++
2 files changed, 340 insertions(+), 63 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
index 87984da883..1d47ac10dc 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
@@ -21,14 +21,29 @@ package org.apache.sysds.runtime.instructions.ooc;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.FastBufferedDataOutputStream;
import org.apache.sysds.runtime.util.LocalFileUtils;
+import java.io.DataInputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Eviction Manager for the Out-Of-Core stream cache
@@ -69,14 +84,28 @@ import java.util.Map;
public class OOCEvictionManager {
// Configuration: OOC buffer limit as percentage of heap
- private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap
+ private static final double OOC_BUFFER_PERCENTAGE = 0.15 * 0.01 * 2; //
15% of heap
+
+ private static final double PARTITION_EVICTION_SIZE = 64 * 1024 * 1024;
// 64 MB
// Memory limit for ByteBuffers
private static long _limit;
- private static long _size;
+ private static final AtomicLong _size = new AtomicLong(0);
// Cache structures: map key -> MatrixBlock and eviction deque
(head=oldest block)
- private static LinkedHashMap<String, IndexedMatrixValue> _cache = new
LinkedHashMap<>();
+ private static LinkedHashMap<String, BlockEntry> _cache = new
LinkedHashMap<>();
+
+ // Spill related structures
+ private static ConcurrentHashMap<String, spillLocation> _spillLocations
= new ConcurrentHashMap<>();
+ private static ConcurrentHashMap<Integer, partitionFile> _partitions =
new ConcurrentHashMap<>();
+ private static final AtomicInteger _partitionCounter = new
AtomicInteger(0);
+
+ // Track which partitions belong to which stream (for cleanup)
+ private static final ConcurrentHashMap<Long, Set<String>>
_streamPartitions = new ConcurrentHashMap<>();
+
+
+ // Cache level lock
+ private static final Object _cacheLock = new Object();
// Spill directory for evicted blocks
private static String _spillDir;
@@ -86,9 +115,57 @@ public class OOCEvictionManager {
}
private static RPolicy _policy = RPolicy.FIFO;
+ private enum BlockState {
+ HOT, // In-memory
+ EVICTING, // Being written to disk (transition state)
+ COLD // On disk
+ }
+
+ private static class spillLocation {
+ // structure of spillLocation: file, offset
+ final int partitionId;
+ final long offset;
+
+ spillLocation(int partitionId, long offset) {
+
+ this.partitionId = partitionId;
+ this.offset = offset;
+ }
+ }
+
+ private static class partitionFile {
+ final String filePath;
+ final long streamId;
+
+
+ private partitionFile(String filePath, long streamId) {
+ this.filePath = filePath;
+ this.streamId = streamId;
+ }
+ }
+
+ // Per-block state container with own lock.
+ private static class BlockEntry {
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition stateUpdate = lock.newCondition();
+
+ private BlockState state = BlockState.HOT;
+ private IndexedMatrixValue value;
+ private final long streamId;
+ private final int blockId;
+ private final long size;
+
+ BlockEntry(IndexedMatrixValue value, long streamId, int
blockId, long size) {
+ this.value = value;
+ this.streamId = streamId;
+ this.blockId = blockId;
+ this.size = size;
+ }
+ }
+
static {
_limit = (long)(Runtime.getRuntime().maxMemory() *
OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap
- _size = 0;
+ _size.set(0);
_spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
LocalFileUtils.createLocalFileIfNotExist(_spillDir);
}
@@ -96,72 +173,195 @@ public class OOCEvictionManager {
/**
* Store a block in the OOC cache (serialize once)
*/
- public static synchronized void put(long streamId, int blockId,
IndexedMatrixValue value) {
+ public static void put(long streamId, int blockId, IndexedMatrixValue
value) {
MatrixBlock mb = (MatrixBlock) value.getValue();
long size = estimateSerializedSize(mb);
String key = streamId + "_" + blockId;
- IndexedMatrixValue old = _cache.remove(key); // remove old value
+ BlockEntry newEntry = new BlockEntry(value, streamId, blockId,
size);
+ BlockEntry old;
+ synchronized (_cacheLock) {
+ old = _cache.put(key, newEntry); // remove old value,
put new value
+ }
+
+ // Handle replacement with a new lock
if (old != null) {
- _size -= estimateSerializedSize((MatrixBlock)
old.getValue());
+ old.lock.lock();
+ try {
+ if (old.state == BlockState.HOT) {
+ _size.addAndGet(-old.size); // read and
update size in atomic operation
+ }
+ } finally {
+ old.lock.unlock();
+ }
}
+ _size.addAndGet(size);
//make room if needed
- evict(size);
-
- _cache.put(key, value); // put new value last
- _size += size;
+ evict();
}
/**
* Get a block from the OOC cache (deserialize on read)
*/
- public static synchronized IndexedMatrixValue get(long streamId, int
blockId) {
+ public static IndexedMatrixValue get(long streamId, int blockId) {
String key = streamId + "_" + blockId;
- IndexedMatrixValue imv = _cache.get(key);
+ BlockEntry imv;
+
+ synchronized (_cacheLock) {
+ imv = _cache.get(key);
+ System.err.println( "value of imv: " + imv);
+ if (imv != null && _policy == RPolicy.LRU) {
+ _cache.remove(key);
+ _cache.put(key, imv); //add last semantic
+ }
+ }
+
+ if (imv == null) {
+ throw new DMLRuntimeException("Block not found in
cache: " + key);
+ }
+ // use lock and check state
+ imv.lock.lock();
+ try {
+ // 1. wait for eviction to complete
+ while (imv.state == BlockState.EVICTING) {
+ try {
+ imv.stateUpdate.await();
+ } catch (InterruptedException e) {
+
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ // 2. check if the block is in HOT
+ if (imv.state == BlockState.HOT) {
+ return imv.value;
+ }
- if (imv != null && _policy == RPolicy.LRU) {
- _cache.remove(key);
- _cache.put(key, imv); //add last semantic
+ } finally {
+ imv.lock.unlock();
}
-
- //restore if needed
- return (imv.getValue() != null) ? imv :
- loadFromDisk(streamId, blockId);
+
+ // restore, since the block is COLD
+ return loadFromDisk(streamId, blockId);
}
/**
* Evict ByteBuffers to disk
*/
- private static void evict(long requiredSize) {
+ private static void evict() {
+ long currentSize = _size.get();
+ if (_size.get() <= _limit) { // only trigger eviction, if
filled.
+ System.err.println("Evicting condition: " + _size.get()
+ "/" + _limit);
+ return;
+ }
+
+ // --- 1. COLLECTION PHASE ---
+ long totalFreedSize = 0;
+ // list of eviction candidates
+ List<Map.Entry<String,BlockEntry>> candidates = new
ArrayList<>();
+ long targetFreedSize = Math.max(currentSize - _limit, (long)
PARTITION_EVICTION_SIZE);
+
+ synchronized (_cacheLock) {
+
+ //move iterator to first entry
+ Iterator<Map.Entry<String, BlockEntry>> iter =
_cache.entrySet().iterator();
+
+ while (iter.hasNext() && totalFreedSize <
targetFreedSize) {
+ Map.Entry<String, BlockEntry> e = iter.next();
+ BlockEntry entry = e.getValue();
+
+ if (entry.lock.tryLock()) {
+ try {
+ if (entry.state ==
BlockState.HOT) {
+ entry.state =
BlockState.EVICTING;
+ candidates.add(e);
+ totalFreedSize +=
entry.size;
+
+ //remove current
iterator entry
+// iter.remove();
+ }
+ } finally {
+ entry.lock.unlock();
+ }
+ } // if tryLock() fails, it means a thread is
loading/reading this block. we shall skip it.
+ }
+
+ }
+
+ if (candidates.isEmpty()) { return; } // no eviction candidates
found
+
+ // --- 2. WRITE PHASE ---
+ // write to partition file
+ // 1. generate a new ID for the present "partition" (file)
+ int partitionId = _partitionCounter.getAndIncrement();
+
+ // Spill to disk
+ String filename = _spillDir + "/stream_batch_part_" +
partitionId;
+ File spillDirFile = new File(_spillDir);
+ if (!spillDirFile.exists()) {
+ spillDirFile.mkdirs();
+ }
+
+ // 2. create the partition file metadata
+ partitionFile partFile = new partitionFile(filename, 0);
+ _partitions.put(partitionId, partFile);
+
+ FileOutputStream fos = null;
+ FastBufferedDataOutputStream dos = null;
try {
- int pos = 0;
- while(_size + requiredSize > _limit && pos++ <
_cache.size()) {
- //System.out.println("BUFFER:
"+_size+"/"+_limit+" size="+_cache.size());
- Map.Entry<String,IndexedMatrixValue> tmp =
removeFirstFromCache();
- if( tmp == null || tmp.getValue().getValue() ==
null ) {
- if( tmp != null )
- _cache.put(tmp.getKey(),
tmp.getValue());
- continue;
+ fos = new FileOutputStream(filename);
+ dos = new FastBufferedDataOutputStream(fos);
+
+
+ // loop over the list of blocks we collected
+ for (Map.Entry<String,BlockEntry> tmp : candidates) {
+ BlockEntry entry = tmp.getValue();
+
+ // 1. get the current file position. this is
the offset.
+ // flush any buffered data to the file
+ dos.flush();
+ long offset = fos.getChannel().position();
+
+ // 2. write indexes and block
+ entry.value.getIndexes().write(dos); // write
Indexes
+ entry.value.getValue().write(dos);
+ System.out.println("written, partition id: " +
_partitions.get(partitionId) + ", offset: " + offset);
+
+ // 3. create the spillLocation
+ spillLocation sloc = new
spillLocation(partitionId, offset);
+ _spillLocations.put(tmp.getKey(), sloc);
+
+ // 4. track file for cleanup
+ _streamPartitions
+
.computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet())
+ .add(filename);
+
+ // 5. change state to COLD
+ entry.lock.lock();
+ try {
+ entry.value = null; // only release
ref, don't mutate object
+ entry.state = BlockState.COLD; // set
state to cold, since writing to disk
+ entry.stateUpdate.signalAll(); // wake
up any "get()" threads
+ } finally {
+ entry.lock.unlock();
}
-
- // Spill to disk
- String filename = _spillDir + "/" +
tmp.getKey();
- File spillDirFile = new File(_spillDir);
- if (!spillDirFile.exists()) {
- spillDirFile.mkdirs();
+
+ synchronized (_cacheLock) {
+ _cache.put(tmp.getKey(), entry); // add
last semantic
}
-
LocalFileUtils.writeMatrixBlockToLocal(filename,
(MatrixBlock)tmp.getValue().getValue());
-
- // Evict from memory
- long freedSize =
estimateSerializedSize((MatrixBlock)tmp.getValue().getValue());
- tmp.getValue().setValue(null);
- _cache.put(tmp.getKey(), tmp.getValue()); //
add last semantic
- _size -= freedSize;
}
}
- catch(Exception ex) {
+ catch(IOException ex) {
throw new DMLRuntimeException(ex);
+ } finally {
+ IOUtilFunctions.closeSilently(dos);
+ IOUtilFunctions.closeSilently(fos);
+ }
+
+ // --- 3. ACCOUNTING PHASE ---
+ if (totalFreedSize > 0) { // note the size, without evicted
blocks
+ _size.addAndGet(-totalFreedSize);
}
}
@@ -170,37 +370,87 @@ public class OOCEvictionManager {
*/
private static IndexedMatrixValue loadFromDisk(long streamId, int
blockId) {
String key = streamId + "_" + blockId;
- String filename = _spillDir + "/" + key;
- try {
- // check if file exists
- if (!LocalFileUtils.isExisting(filename)) {
- throw new IOException("File " + filename + "
does not exist");
+ // 1. find the blocks address (spill location)
+ spillLocation sloc = _spillLocations.get(key);
+ if (sloc == null) {
+ throw new DMLRuntimeException("Failed to load spill
location for: " + key);
+ }
+
+ partitionFile partFile = _partitions.get(sloc.partitionId);
+ if (partFile == null) {
+ throw new DMLRuntimeException("Failed to load partition
for: " + sloc.partitionId);
+ }
+
+ String filename = partFile.filePath;
+
+ // Create an empty object to read data into.
+ MatrixIndexes ix = new MatrixIndexes();
+ MatrixBlock mb = new MatrixBlock();
+
+ try (RandomAccessFile raf = new RandomAccessFile(filename,
"r")) {
+ raf.seek(sloc.offset);
+
+ try {
+ DataInputStream dis = new
DataInputStream(Channels.newInputStream(raf.getChannel()));
+ ix.readFields(dis); // 1. Read Indexes
+ mb.readFields(dis); // 2. Read Block
+ } catch (IOException ex) {
+ throw new DMLRuntimeException("Failed to load
block " + key + " from " + filename, ex);
}
-
- // Read from disk and put into original indexed matrix
value
- MatrixBlock mb =
LocalFileUtils.readMatrixBlockFromLocal(filename);
- IndexedMatrixValue imv = _cache.get(key);
- imv.setValue(mb);
- return imv;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
+ // Read from disk and put into original indexed matrix value
+ BlockEntry imvCacheEntry;
+ synchronized (_cacheLock) {
+ imvCacheEntry = _cache.get(key);
+ }
+
+ // 2. Check if it's null (the bug you helped fix before)
+ if(imvCacheEntry == null) {
+ throw new DMLRuntimeException("Block entry " + key + "
was not in cache during load.");
}
+
+ imvCacheEntry.lock.lock();
+ try {
+ if (imvCacheEntry.state == BlockState.COLD) {
+ imvCacheEntry.value = new
IndexedMatrixValue(ix, mb);
+ imvCacheEntry.state = BlockState.HOT;
+ _size.addAndGet(imvCacheEntry.size);
+
+ synchronized (_cacheLock) {
+ _cache.remove(key);
+ _cache.put(key, imvCacheEntry);
+ }
+ }
+
+// evict(); // when we add the block, we shall check for
limit.
+ } finally {
+ imvCacheEntry.lock.unlock();
+ }
+
+ return imvCacheEntry.value;
}
private static long estimateSerializedSize(MatrixBlock mb) {
return mb.getExactSerializedSize();
}
- private static Map.Entry<String, IndexedMatrixValue>
removeFirstFromCache() {
- //move iterator to first entry
- Iterator<Map.Entry<String, IndexedMatrixValue>> iter =
_cache.entrySet().iterator();
- Map.Entry<String, IndexedMatrixValue> entry = iter.next();
+ private static Map.Entry<String, BlockEntry> removeFirstFromCache() {
+ synchronized (_cacheLock) {
- //remove current iterator entry
- iter.remove();
+ if (_cache.isEmpty()) {
+ return null;
+ }
+ //move iterator to first entry
+ Iterator<Map.Entry<String, BlockEntry>> iter =
_cache.entrySet().iterator();
+ Map.Entry<String, BlockEntry> entry = iter.next();
- return entry;
+ //remove current iterator entry
+ iter.remove();
+
+ return entry;
+ }
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java
b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java
new file mode 100644
index 0000000000..361e2ea68f
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.util;
+
+/**
+ * reference-counted file manager for SystemDS partition files.
+ * Prevents deletion while concurrent readers are active.
+ */
+public class PartitionFileManager {
+}