This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b0ef87544e [SYSTEMDS-3931] Out-of-core right indexing operations
b0ef87544e is described below

commit b0ef87544ec38384ab2d41aa1d2a387fbf7b7a36
Author: Jannik Lindemann <[email protected]>
AuthorDate: Sun Nov 16 11:21:46 2025 +0100

    [SYSTEMDS-3931] Out-of-core right indexing operations
    
    Closes #2351.
---
 .../runtime/instructions/OOCInstructionParser.java |   7 +-
 .../runtime/instructions/ooc/CachingStream.java    |  45 +--
 .../instructions/ooc/IndexingOOCInstruction.java   | 338 +++++++++++++++++++++
 .../ooc/MatrixIndexingOOCInstruction.java          | 254 ++++++++++++++++
 .../instructions/ooc/SubscribableTaskQueue.java    |   2 +-
 .../instructions/ooc/UnaryOOCInstruction.java      |   6 +
 .../org/apache/sysds/runtime/util/IndexRange.java  |  12 +
 .../functions/ooc/RightIndexingCoordsTest.java     | 133 ++++++++
 .../test/functions/ooc/RightIndexingTest.java      | 215 +++++++++++++
 src/test/scripts/functions/ooc/RightIndexing.dml   |  31 ++
 .../scripts/functions/ooc/RightIndexingCoords.dml  |  29 ++
 11 files changed, 1047 insertions(+), 25 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index a14079160f..938615eb9c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -28,6 +28,7 @@ import 
org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
 import 
org.apache.sysds.runtime.instructions.ooc.ParameterizedBuiltinOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
@@ -75,12 +76,14 @@ public class OOCInstructionParser extends InstructionParser 
{
                                return 
TransposeOOCInstruction.parseInstruction(str);
                        case Tee:
                                return TeeOOCInstruction.parseInstruction(str);
-            case CentralMoment:
-                return  CentralMomentOOCInstruction.parseInstruction(str);
+                       case CentralMoment:
+                               return  
CentralMomentOOCInstruction.parseInstruction(str);
                        case Ctable:
                                return 
CtableOOCInstruction.parseInstruction(str);
                        case ParameterizedBuiltin:
                                return 
ParameterizedBuiltinOOCInstruction.parseInstruction(str);
+                       case MatrixIndexing:
+                               return 
IndexingOOCInstruction.parseInstruction(str);
 
                        default:
                                throw new DMLRuntimeException("Invalid OOC 
Instruction Type: " + ooctype);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
index 1a54030280..b74f7ed5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
@@ -80,36 +80,37 @@ public class CachingStream implements 
OOCStreamable<IndexedMatrixValue> {
                });
        }
 
-       private boolean fetchFromStream() throws InterruptedException {
-               synchronized (this) {
-                       if(!_cacheInProgress)
-                               throw new DMLRuntimeException("Stream is 
closed");
-               }
+       private synchronized boolean fetchFromStream() throws 
InterruptedException {
+               if(!_cacheInProgress)
+                       throw new DMLRuntimeException("Stream is closed");
 
                IndexedMatrixValue task = _source.dequeue();
 
-               synchronized (this) {
-                       if(task != LocalTaskQueue.NO_MORE_TASKS) {
-                               OOCEvictionManager.put(_streamId, _numBlocks, 
task);
-                               if (_index != null)
-                                       _index.put(task.getIndexes(), 
_numBlocks);
-                               _numBlocks++;
-                               notifyAll();
-                               return false;
-                       }
-                       else {
-                               _cacheInProgress = false; // caching is complete
-                               notifyAll();
-                               return true;
-                       }
+               if(task != LocalTaskQueue.NO_MORE_TASKS) {
+                       OOCEvictionManager.put(_streamId, _numBlocks, task);
+                       if (_index != null)
+                               _index.put(task.getIndexes(), _numBlocks);
+                       _numBlocks++;
+                       notifyAll();
+                       return false;
+               }
+               else {
+                       _cacheInProgress = false; // caching is complete
+                       notifyAll();
+                       return true;
                }
        }
 
        public synchronized IndexedMatrixValue get(int idx) throws 
InterruptedException {
                while (true) {
-                       if (idx < _numBlocks)
-                               return OOCEvictionManager.get(_streamId, idx);
-                       else if (!_cacheInProgress)
+                       if (idx < _numBlocks) {
+                               IndexedMatrixValue out = 
OOCEvictionManager.get(_streamId, idx);
+
+                               if (_index != null) // Ensure index is up to 
date
+                                       _index.putIfAbsent(out.getIndexes(), 
idx);
+
+                               return out;
+                       } else if (!_cacheInProgress)
                                return 
(IndexedMatrixValue)LocalTaskQueue.NO_MORE_TASKS;
 
                        wait();
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
new file mode 100644
index 0000000000..1d555da8d6
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.IndexRange;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+public abstract class IndexingOOCInstruction extends UnaryOOCInstruction {
+       protected final CPOperand rowLower, rowUpper, colLower, colUpper;
+
+       public static IndexingOOCInstruction parseInstruction(String str) {
+               IndexingCPInstruction cpInst = 
IndexingCPInstruction.parseInstruction(str);
+               return parseInstruction(cpInst);
+       }
+
+       public static IndexingOOCInstruction 
parseInstruction(IndexingCPInstruction cpInst) {
+               String opcode = cpInst.getOpcode();
+
+               if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) {
+                       if(cpInst.input1.getDataType().isMatrix()) {
+                               return new 
MatrixIndexingOOCInstruction(cpInst.input1, cpInst.getRowLower(), 
cpInst.getRowUpper(),
+                                       cpInst.getColLower(), 
cpInst.getColUpper(), cpInst.output, cpInst.getOpcode(),
+                                       cpInst.getInstructionString());
+                       }
+                       else {
+                               throw new NotImplementedException();
+                       }
+               }
+
+               throw new NotImplementedException();
+       }
+
+       protected IndexingOOCInstruction(CPOperand in, CPOperand rl, CPOperand 
ru, CPOperand cl, CPOperand cu,
+               CPOperand out, String opcode, String istr) {
+               super(OOCInstruction.OOCType.MatrixIndexing, null, in, out, 
opcode, istr);
+               rowLower = rl;
+               rowUpper = ru;
+               colLower = cl;
+               colUpper = cu;
+       }
+
+       protected IndexingOOCInstruction(CPOperand lhsInput, CPOperand 
rhsInput, CPOperand rl, CPOperand ru, CPOperand cl,
+               CPOperand cu, CPOperand out, String opcode, String istr) {
+               super(OOCInstruction.OOCType.MatrixIndexing, null, lhsInput, 
rhsInput, out, opcode, istr);
+               rowLower = rl;
+               rowUpper = ru;
+               colLower = cl;
+               colUpper = cu;
+       }
+
+       protected IndexRange getIndexRange(ExecutionContext ec) {
+               return new IndexRange( //rl, ru, cl, ru
+                       (int) (ec.getScalarInput(rowLower).getLongValue() - 1),
+                       (int) (ec.getScalarInput(rowUpper).getLongValue() - 1),
+                       (int) (ec.getScalarInput(colLower).getLongValue() - 1),
+                       (int) (ec.getScalarInput(colUpper).getLongValue() - 1));
+       }
+
+       public static class BlockAligner<T> {
+               private final int _blocksize;
+               private final IndexRange _indexRange;
+               private final IndexRange _blockRange;
+               private final int _outRows;
+               private final int _outCols;
+               private final Sector<T>[] _blocks;
+               private final AtomicInteger _emitCtr;
+
+               @SuppressWarnings("unchecked")
+               public BlockAligner(IndexRange range, int blocksize) {
+                       _indexRange = range;
+                       _blocksize = blocksize;
+
+                       long firstBlockRow = range.rowStart / blocksize;
+                       long lastBlockRow = range.rowEnd / blocksize;
+                       long firstBlockCol = range.colStart / blocksize;
+                       long lastBlockCol = range.colEnd / blocksize;
+                       _blockRange = new IndexRange(firstBlockRow, 
lastBlockRow + 1, firstBlockCol, lastBlockCol + 1);
+
+                       long totalRows = range.rowSpan() + 1;
+                       long totalCols = range.colSpan() + 1;
+                       _outRows = (int) ((totalRows + blocksize - 1) / 
blocksize);
+                       _outCols = (int) ((totalCols + blocksize - 1) / 
blocksize);
+
+                       _blocks = (Sector<T>[]) new Sector[_outRows * _outCols];
+                       _emitCtr = new AtomicInteger(0);
+               }
+
+               public boolean isAligned() {
+                       return (_indexRange.rowStart % _blocksize) == 0 && 
(_indexRange.colStart % _blocksize) == 0;
+               }
+
+               public boolean putNext(MatrixIndexes index, T data, 
BiConsumer<MatrixIndexes, Sector<T>> emitter) {
+                       long blockRow = index.getRowIndex() - 1;
+                       long blockCol = index.getColumnIndex() - 1;
+
+                       if(!_blockRange.isWithin(blockRow, blockCol))
+                               return false;
+
+                       long blockRowStart = blockRow * _blocksize;
+                       long blockRowEnd = blockRowStart + _blocksize - 1;
+                       long blockColStart = blockCol * _blocksize;
+                       long blockColEnd = blockColStart + _blocksize - 1;
+
+                       long overlapRowStart = Math.max(_indexRange.rowStart, 
blockRowStart);
+                       long overlapRowEnd = Math.min(_indexRange.rowEnd, 
blockRowEnd);
+                       long overlapColStart = Math.max(_indexRange.colStart, 
blockColStart);
+                       long overlapColEnd = Math.min(_indexRange.colEnd, 
blockColEnd);
+
+                       int outRowStart = (int) ((overlapRowStart - 
_indexRange.rowStart) / _blocksize);
+                       int outRowEnd = (int) ((overlapRowEnd - 
_indexRange.rowStart) / _blocksize);
+                       int outColStart = (int) ((overlapColStart - 
_indexRange.colStart) / _blocksize);
+                       int outColEnd = (int) ((overlapColEnd - 
_indexRange.colStart) / _blocksize);
+
+                       int emitCtr = -1;
+
+                       for(int outRow = outRowStart; outRow <= outRowEnd; 
outRow++) {
+                               long targetRowStartGlobal = 
_indexRange.rowStart + (long) outRow * _blocksize;
+                               long targetRowEndGlobal = 
Math.min(_indexRange.rowEnd, targetRowStartGlobal + _blocksize - 1);
+                               long targetStartBlockRow = targetRowStartGlobal 
/ _blocksize;
+                               long targetEndBlockRow = targetRowEndGlobal / 
_blocksize;
+                               int rowSegments = (int) (targetEndBlockRow - 
targetStartBlockRow + 1);
+
+                               for(int outCol = outColStart; outCol <= 
outColEnd; outCol++) {
+                                       long targetColStartGlobal = 
_indexRange.colStart + (long) outCol * _blocksize;
+                                       long targetColEndGlobal = 
Math.min(_indexRange.colEnd, targetColStartGlobal + _blocksize - 1);
+                                       long targetStartBlockCol = 
targetColStartGlobal / _blocksize;
+                                       long targetEndBlockCol = 
targetColEndGlobal / _blocksize;
+                                       int colSegments = (int) 
(targetEndBlockCol - targetStartBlockCol + 1);
+
+                                       int rowOffset = (rowSegments == 1) ? 0 
: (blockRow == targetStartBlockRow ? 0 : 1);
+                                       int colOffset = (colSegments == 1) ? 0 
: (blockCol == targetStartBlockCol ? 0 : 1);
+
+                                       Sector<T> sector = getOrCreate(outRow, 
outCol, rowSegments, colSegments);
+                                       if(sector == null)
+                                               continue;
+
+                                       boolean emit = sector.place(rowOffset, 
colOffset, data);
+                                       if(emit) {
+                                               int idxPos = 
resolveIndex(outRow, outCol);
+                                               _blocks[idxPos] = null;
+                                               emitCtr = 
_emitCtr.incrementAndGet();
+                                               emitter.accept(new 
MatrixIndexes(outRow + 1, outCol + 1), sector);
+                                       }
+                               }
+                       }
+
+                       return emitCtr >= _blocks.length;
+               }
+
+               private int resolveIndex(int row, int col) {
+                       if(row < 0 || row >= _outRows || col < 0 || col >= 
_outCols)
+                               return -1;
+                       return row * _outCols + col;
+               }
+
+               private synchronized Sector<T> getOrCreate(int outRow, int 
outCol, int rowSegments, int colSegments) {
+                       int idx = resolveIndex(outRow, outCol);
+                       if(idx == -1)
+                               return null;
+
+                       Sector<T> s = _blocks[idx];
+                       if(s == null) {
+                               if(rowSegments == 1 && colSegments == 1)
+                                       s = new Sector1<>();
+                               else if(rowSegments == 1 && colSegments == 2)
+                                       s = new Sector2Col<>();
+                               else if(rowSegments == 2 && colSegments == 1)
+                                       s = new Sector2Row<>();
+                               else
+                                       s = new Sector4<>();
+                               _blocks[idx] = s;
+                       }
+
+                       return s;
+               }
+
+               public synchronized void close() {
+                       if(_emitCtr.get() != _blocks.length)
+                               throw new DMLRuntimeException("BlockAligner 
still has some unfinished sectors");
+               }
+       }
+
+       public static abstract class Sector<T> {
+               public abstract boolean place(int rowOffset, int colOffset, T 
data);
+
+               public abstract T get(int rowOffset, int colOffset);
+
+               public abstract int count();
+       }
+
+       public static class Sector1<T> extends Sector<T> {
+               private T _data;
+
+               @Override
+               public synchronized boolean place(int rowOffset, int colOffset, 
T data) {
+                       if(rowOffset != 0 || colOffset != 0)
+                               return false;
+
+                       _data = data;
+                       return true;
+               }
+
+               @Override
+               public synchronized T get(int rowOffset, int colOffset) {
+                       return (rowOffset == 0 && colOffset == 0) ? _data : 
null;
+               }
+
+               @Override
+               public synchronized int count() {
+                       return _data == null ? 0 : 1;
+               }
+       }
+
+       public static class Sector4<T> extends Sector<T> {
+               private int _count;
+               private final T[] _data;
+
+               @SuppressWarnings("unchecked")
+               public Sector4() {
+                       _count = 0;
+                       _data = (T[]) new Object[4];
+               }
+
+               @Override
+               public synchronized boolean place(int rowOffset, int colOffset, 
T data) {
+                       int pos = rowOffset * 2 + colOffset;
+                       if(_data[pos] == null) {
+                               _data[pos] = data;
+                               _count++;
+                       }
+                       return _count == 4;
+               }
+
+               @Override
+               public synchronized T get(int rowOffset, int colOffset) {
+                       return _data[rowOffset * 2 + colOffset];
+               }
+
+               @Override
+               public synchronized int count() {
+                       return _count;
+               }
+       }
+
+       public static class Sector2Col<T> extends Sector<T> {
+               private int _count;
+               private final T[] _data;
+
+               @SuppressWarnings("unchecked")
+               public Sector2Col() {
+                       _count = 0;
+                       _data = (T[]) new Object[2];
+               }
+
+               @Override
+               public synchronized boolean place(int rowOffset, int colOffset, 
T data) {
+                       if(rowOffset != 0 || colOffset < 0 || colOffset > 1)
+                               return false;
+
+                       if(_data[colOffset] == null) {
+                               _data[colOffset] = data;
+                               _count++;
+                       }
+
+                       return _count == 2;
+               }
+
+               @Override
+               public synchronized T get(int rowOffset, int colOffset) {
+                       return (rowOffset == 0 && colOffset >= 0 && colOffset < 
2) ? _data[colOffset] : null;
+               }
+
+               @Override
+               public synchronized int count() {
+                       return _count;
+               }
+       }
+
+       public static class Sector2Row<T> extends Sector<T> {
+               private int _count;
+               private final T[] _data;
+
+               @SuppressWarnings("unchecked")
+               public Sector2Row() {
+                       _count = 0;
+                       _data = (T[]) new Object[2];
+               }
+
+               @Override
+               public synchronized boolean place(int rowOffset, int colOffset, 
T data) {
+                       if(colOffset != 0 || rowOffset < 0 || rowOffset > 1)
+                               return false;
+
+                       if(_data[rowOffset] == null) {
+                               _data[rowOffset] = data;
+                               _count++;
+                       }
+
+                       return _count == 2;
+               }
+
+               @Override
+               public synchronized T get(int rowOffset, int colOffset) {
+                       return (colOffset == 0 && rowOffset >= 0 && rowOffset < 
2) ? _data[rowOffset] : null;
+               }
+
+               @Override
+               public synchronized int count() {
+                       return _count;
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
new file mode 100644
index 0000000000..a04a77677c
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.IndexRange;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MatrixIndexingOOCInstruction extends IndexingOOCInstruction {
+
+       public MatrixIndexingOOCInstruction(CPOperand in, CPOperand rl, 
CPOperand ru, CPOperand cl, CPOperand cu,
+               CPOperand out, String opcode, String istr) {
+               super(in, rl, ru, cl, cu, out, opcode, istr);
+       }
+
+       protected MatrixIndexingOOCInstruction(CPOperand lhsInput, CPOperand 
rhsInput, CPOperand rl, CPOperand ru,
+               CPOperand cl, CPOperand cu, CPOperand out, String opcode, 
String istr) {
+               super(lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               String opcode = getOpcode();
+               IndexRange ix = getIndexRange(ec);
+
+               MatrixObject mo = ec.getMatrixObject(input1.getName());
+               int blocksize = mo.getBlocksize();
+               long firstBlockRow = ix.rowStart / blocksize;
+               long lastBlockRow = ix.rowEnd / blocksize;
+               long firstBlockCol = ix.colStart / blocksize;
+               long lastBlockCol = ix.colEnd / blocksize;
+
+               boolean inRange = ix.rowStart < mo.getNumRows() && ix.colStart 
< mo.getNumColumns();
+
+               OOCStream<IndexedMatrixValue> qIn = mo.getStreamHandle();
+               OOCStream<IndexedMatrixValue> qOut = createWritableStream();
+
+               addInStream(qIn);
+               addOutStream(qOut);
+
+               MatrixObject mOut = ec.getMatrixObject(output);
+               mOut.setStreamHandle(qOut);
+
+               //right indexing
+               if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) {
+                       if(output.isScalar() && inRange) {
+                               IndexedMatrixValue tmp;
+
+                               while((tmp = qIn.dequeue()) != 
LocalTaskQueue.NO_MORE_TASKS) {
+                                       if(tmp.getIndexes().getRowIndex() == 
firstBlockRow &&
+                                               
tmp.getIndexes().getColumnIndex() == firstBlockCol) {
+                                               
ec.setScalarOutput(output.getName(), new DoubleObject(
+                                                       
tmp.getValue().get((int) ix.rowStart % blocksize, (int) ix.rowEnd % 
blocksize)));
+                                               return;
+                                       }
+                               }
+
+                               throw new DMLRuntimeException("Desired block 
not found");
+                       }
+
+                       final AtomicReference<CompletableFuture<Void>> 
futureRef = new AtomicReference<>();
+
+                       if(ix.rowStart % blocksize == 0 && ix.colStart % 
blocksize == 0) {
+                               // Aligned case: interior blocks can be 
forwarded directly, borders may require slicing
+                               final int outBlockRows = (int) 
Math.ceil((double) (ix.rowSpan() + 1) / blocksize);
+                               final int outBlockCols = (int) 
Math.ceil((double) (ix.colSpan() + 1) / blocksize);
+                               final int totalBlocks = outBlockRows * 
outBlockCols;
+                               final AtomicInteger producedBlocks = new 
AtomicInteger(0);
+
+                               CompletableFuture<Void> future = filterOOC(qIn, 
tmp -> {
+                                       MatrixIndexes inIdx = tmp.getIndexes();
+                                       long blockRow = inIdx.getRowIndex() - 1;
+                                       long blockCol = inIdx.getColumnIndex() 
- 1;
+
+                                       MatrixBlock block = (MatrixBlock) 
tmp.getValue();
+
+                                       int rowStartLocal = (blockRow == 
firstBlockRow) ? (int) (ix.rowStart % blocksize) : 0;
+                                       int rowEndLocal = (blockRow == 
lastBlockRow) ? Math.min(block.getNumRows() - 1,
+                                               (int) (ix.rowEnd % blocksize)) 
: block.getNumRows() - 1;
+                                       int colStartLocal = (blockCol == 
firstBlockCol) ? (int) (ix.colStart % blocksize) : 0;
+                                       int colEndLocal = (blockCol == 
lastBlockCol) ? Math.min(block.getNumColumns() - 1,
+                                               (int) (ix.colEnd % blocksize)) 
: block.getNumColumns() - 1;
+
+                                       MatrixBlock outBlock;
+                                       if(rowStartLocal == 0 && rowEndLocal == 
block.getNumRows() - 1 && colStartLocal == 0 &&
+                                               colEndLocal == 
block.getNumColumns() - 1) {
+                                               outBlock = block;
+                                       }
+                                       else {
+                                               outBlock = 
block.slice(rowStartLocal, rowEndLocal, colStartLocal, colEndLocal);
+                                       }
+
+                                       long outBlockRow = blockRow - 
firstBlockRow + 1;
+                                       long outBlockCol = blockCol - 
firstBlockCol + 1;
+                                       qOut.enqueue(new IndexedMatrixValue(new 
MatrixIndexes(outBlockRow, outBlockCol), outBlock));
+
+                                       if(producedBlocks.incrementAndGet() >= 
totalBlocks) {
+                                               CompletableFuture<Void> f = 
futureRef.get();
+                                               if(f != null)
+                                                       f.cancel(true);
+                                       }
+                               }, tmp -> {
+                                       long blockRow = 
tmp.getIndexes().getRowIndex() - 1;
+                                       long blockCol = 
tmp.getIndexes().getColumnIndex() - 1;
+                                       return blockRow >= firstBlockRow && 
blockRow <= lastBlockRow && blockCol >= firstBlockCol &&
+                                               blockCol <= lastBlockCol;
+                               }, qOut::closeInput);
+                               futureRef.set(future);
+                               return;
+                       }
+
+                       final BlockAligner<IndexedBlockMeta> aligner = new 
BlockAligner<>(ix, blocksize);
+
+                       // We may need to construct our own intermediate stream 
to properly manage the cached items
+                       boolean hasIntermediateStream = !qIn.hasStreamCache();
+                       final CachingStream cachedStream = 
hasIntermediateStream ? new CachingStream(new SubscribableTaskQueue<>()) : 
qOut.getStreamCache();
+                       cachedStream.activateIndexing();
+
+                       CompletableFuture<Void> future = 
filterOOC(qIn.getReadStream(), tmp -> {
+                               if (hasIntermediateStream) {
+                                       // We write to an intermediate stream 
to ensure that these matrix blocks are properly cached
+                                       
cachedStream.getWriteStream().enqueue(tmp);
+                               }
+
+                               boolean completed = 
aligner.putNext(tmp.getIndexes(), new IndexedBlockMeta(tmp), (idx, sector) -> {
+                                       int targetBlockRow = (int) 
(idx.getRowIndex() - 1);
+                                       int targetBlockCol = (int) 
(idx.getColumnIndex() - 1);
+
+                                       long targetRowStartGlobal = ix.rowStart 
+ (long) targetBlockRow * blocksize;
+                                       long targetRowEndGlobal = 
Math.min(ix.rowEnd, targetRowStartGlobal + blocksize - 1);
+                                       long targetColStartGlobal = ix.colStart 
+ (long) targetBlockCol * blocksize;
+                                       long targetColEndGlobal = 
Math.min(ix.colEnd, targetColStartGlobal + blocksize - 1);
+
+                                       int nRows = (int) (targetRowEndGlobal - 
targetRowStartGlobal + 1);
+                                       int nCols = (int) (targetColEndGlobal - 
targetColStartGlobal + 1);
+
+                                       long firstSrcBlockRow = 
targetRowStartGlobal / blocksize;
+                                       long lastSrcBlockRow = 
targetRowEndGlobal / blocksize;
+                                       int rowSegments = (int) 
(lastSrcBlockRow - firstSrcBlockRow + 1);
+
+                                       long firstSrcBlockCol = 
targetColStartGlobal / blocksize;
+                                       long lastSrcBlockCol = 
targetColEndGlobal / blocksize;
+                                       int colSegments = (int) 
(lastSrcBlockCol - firstSrcBlockCol + 1);
+
+                                       MatrixBlock target = null;
+
+                                       for(int r = 0; r < rowSegments; r++) {
+                                               for(int c = 0; c < colSegments; 
c++) {
+                                                       IndexedBlockMeta ibm = 
sector.get(r, c);
+                                                       if(ibm == null)
+                                                               continue;
+
+                                                       IndexedMatrixValue mv = 
cachedStream.findCached(ibm.idx);
+                                                       MatrixBlock srcBlock = 
(MatrixBlock) mv.getValue();
+
+                                                       if(target == null)
+                                                               target = new 
MatrixBlock(nRows, nCols, srcBlock.isInSparseFormat());
+
+                                                       long srcBlockRowStart = 
(ibm.idx.getRowIndex() - 1) * blocksize;
+                                                       long srcBlockColStart = 
(ibm.idx.getColumnIndex() - 1) * blocksize;
+                                                       long 
sliceRowStartGlobal = Math.max(targetRowStartGlobal, srcBlockRowStart);
+                                                       long sliceRowEndGlobal 
= Math.min(targetRowEndGlobal,
+                                                               
srcBlockRowStart + srcBlock.getNumRows() - 1);
+                                                       long 
sliceColStartGlobal = Math.max(targetColStartGlobal, srcBlockColStart);
+                                                       long sliceColEndGlobal 
= Math.min(targetColEndGlobal,
+                                                               
srcBlockColStart + srcBlock.getNumColumns() - 1);
+
+                                                       int sliceRowStart = 
(int) (sliceRowStartGlobal - srcBlockRowStart);
+                                                       int sliceRowEnd = (int) 
(sliceRowEndGlobal - srcBlockRowStart);
+                                                       int sliceColStart = 
(int) (sliceColStartGlobal - srcBlockColStart);
+                                                       int sliceColEnd = (int) 
(sliceColEndGlobal - srcBlockColStart);
+
+                                                       int targetRowOffset = 
(int) (sliceRowStartGlobal - targetRowStartGlobal);
+                                                       int targetColOffset = 
(int) (sliceColStartGlobal - targetColStartGlobal);
+
+                                                       MatrixBlock sliced = 
srcBlock.slice(sliceRowStart, sliceRowEnd, sliceColStart, sliceColEnd);
+                                                       sliced.putInto(target, 
targetRowOffset, targetColOffset, true);
+                                               }
+                                       }
+
+                                       qOut.enqueue(new 
IndexedMatrixValue(idx, target));
+                               });
+
+                               if(completed) {
+                                       // All blocks have been processed; we 
can cancel the future
+                                       // Currently, this does not affect 
processing (predicates prevent task submission anyway).
+                                       // However, a cancelled future may 
allow early file read aborts once implemented.
+                                       CompletableFuture<Void> f = 
futureRef.get();
+                                       if(f != null)
+                                               f.cancel(true);
+                               }
+                       }, tmp -> {
+                               // Pre-filter incoming blocks to avoid 
unnecessary task submission
+                               long blockRow = tmp.getIndexes().getRowIndex() 
- 1;
+                               long blockCol = 
tmp.getIndexes().getColumnIndex() - 1;
+                               return blockRow >= firstBlockRow && blockRow <= 
lastBlockRow && blockCol >= firstBlockCol &&
+                                       blockCol <= lastBlockCol;
+                       }, () -> {
+                               aligner.close();
+                               qOut.closeInput();
+                       });
+                       futureRef.set(future);
+               }
+               //left indexing
+               else if(opcode.equalsIgnoreCase(Opcodes.LEFT_INDEX.toString())) 
{
+                       throw new NotImplementedException();
+               }
+               else
+                       throw new DMLRuntimeException(
+                               "Invalid opcode (" + opcode + ") encountered in 
MatrixIndexingOOCInstruction.");
+       }
+
+       private static class IndexedBlockMeta {
+               public final MatrixIndexes idx;
+               ////public final long nrows;
+               //public final long ncols;
+
+               public IndexedBlockMeta(IndexedMatrixValue mv) {
+                       this.idx = mv.getIndexes();
+                       //this.nrows = mv.getValue().getNumRows();
+                       //this.ncols = mv.getValue().getNumColumns();
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
index 5f97bd99e9..f136ffc2bb 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
@@ -26,7 +26,7 @@ public class SubscribableTaskQueue<T> extends 
LocalTaskQueue<T> implements OOCSt
        private Runnable _subscriber;
 
        @Override
-       public void enqueue(T t) {
+       public synchronized void enqueue(T t) {
                try {
                        super.enqueueTask(t);
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index 08f00f86d2..d45d00db59 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -36,6 +36,12 @@ public class UnaryOOCInstruction extends 
ComputationOOCInstruction {
                _uop = op;
        }
 
+       protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand 
in1, CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(type, op, in1, in2, out, opcode, istr);
+
+               _uop = op;
+       }
+
        public static UnaryOOCInstruction parseInstruction(String str) {
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                InstructionUtils.checkNumFields(parts, 2);
diff --git a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java 
b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
index 4a8d999147..44fa8320e3 100644
--- a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
+++ b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
@@ -30,6 +30,10 @@ public class IndexRange implements Serializable
        public long rowEnd = 0;
        public long colStart = 0;
        public long colEnd = 0;
+
+       public static IndexRange intersect(IndexRange a, IndexRange b) {
+               return new IndexRange(Math.max(a.rowStart, b.rowStart), 
Math.min(a.rowEnd, b.rowEnd), Math.max(a.colStart, b.colStart), 
Math.min(a.colEnd, b.colEnd));
+       }
        
        public IndexRange(long rs, long re, long cs, long ce) {
                set(rs, re, cs, ce);
@@ -52,6 +56,10 @@ public class IndexRange implements Serializable
                        colStart + delta, colEnd + delta);
        }
 
+       public IndexRange add(long rowDelta, long colDelta) {
+               return new IndexRange(rowStart + rowDelta, rowEnd + rowDelta, 
colStart + colDelta, colEnd + colDelta);
+       }
+
        public boolean inColRange(long col) {
                return col >= colStart && col < colEnd;
        }
@@ -68,6 +76,10 @@ public class IndexRange implements Serializable
                return rowEnd - rowStart;
        }
 
+       public boolean isWithin(long row, long col) {
+               return inColRange(col) && inRowRange(row);
+       }
+
        @Override
        public String toString() {
                return "["+rowStart+":"+rowEnd+","+colStart+":"+colEnd+"]";
diff --git 
a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
 
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
new file mode 100644
index 0000000000..0dab096e0c
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RightIndexingCoordsTest extends AutomatedTestBase {
+       private final static String TEST_NAME1 = "RightIndexingCoords";
+       private final static String TEST_DIR = "functions/ooc/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
RightIndexingCoordsTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-8;
+       private static final String INPUT_NAME = "X";
+       private static final String OUTPUT_NAME = "res";
+
+       private final static int nrows = 2300;
+       private final static int ncols = 1200;
+       private final static int maxVal = 7;
+       private final static double sparsity1 = 1;
+       private final static double sparsity2 = 0.05;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               TestConfiguration config = new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+               addTestConfiguration(TEST_NAME1, config);
+       }
+
+       @Test
+       public void testRightIndexingDense1() {
+               runRightIndexingTest(1, 1, false);
+       }
+
+       @Test
+       public void testRightIndexingSparse1() {
+               runRightIndexingTest(1, 1, true);
+       }
+
+       @Test
+       public void testRightIndexingDense2() {
+               runRightIndexingTest(1000, 1, false);
+       }
+
+       @Test
+       public void testRightIndexingSparse2() {
+               runRightIndexingTest(1000, 1, true);
+       }
+
+       private void runRightIndexingTest(int rs, int cs, boolean sparse) {
+               Types.ExecMode platformOld = 
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[] {"-explain", "-stats", 
"-ooc", "-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME)};
+
+                       // 1. Generate the data in-memory as MatrixBlock objects
+                       double[][] X_data = getRandomMatrix(nrows, ncols, 1, 
maxVal, sparse ? sparsity2 : sparsity1, 7);
+
+                       // 2. Convert the double arrays to MatrixBlock objects
+                       MatrixBlock X_mb = 
DataConverter.convertToMatrixBlock(X_data);
+
+                       // 3. Create a binary matrix writer
+                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+                       // 4. Write matrix A to a binary SequenceFile
+                       writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME), 
nrows, ncols, 1000, X_mb.getNonZeros());
+                       HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), 
Types.ValueType.FP64,
+                               new MatrixCharacteristics(nrows, ncols, 1000, 
X_mb.getNonZeros()), Types.FileFormat.BINARY);
+
+                       runTest(true, false, null, -1);
+
+                       //check tsmm OOC
+                       Assert.assertTrue("OOC wasn't used for multiplication",
+                               
heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX));
+
+                       //compare results
+
+                       // rerun without ooc flag
+                       programArgs = new String[] {"-explain", "-stats", 
"-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME + "_target")};
+                       runTest(true, false, null, -1);
+
+                       // compare matrices
+
+                       MatrixBlock ret1 = 
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME),
+                               Types.FileFormat.BINARY, 1, 1, 1000);
+                       MatrixBlock ret2 = 
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"),
+                               Types.FileFormat.BINARY, 1, 1, 1000);
+
+                       //System.out.println(ret1.getNumRows() + "x" + 
ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" + 
ret2.getNumColumns());
+                       TestUtils.compareMatrices(ret2, ret1, eps);
+               }
+               catch(IOException e) {
+                       throw new RuntimeException(e);
+               }
+               finally {
+                       resetExecMode(platformOld);
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java
new file mode 100644
index 0000000000..5bd061c646
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RightIndexingTest extends AutomatedTestBase {
+       private final static String TEST_NAME1 = "RightIndexing";
+       private final static String TEST_DIR = "functions/ooc/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
RightIndexingTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-8;
+       private static final String INPUT_NAME = "X";
+       private static final String OUTPUT_NAME = "res";
+
+       private final static int maxVal = 7;
+       private final static double sparsity1 = 1;
+       private final static double sparsity2 = 0.05;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               TestConfiguration config = new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+               addTestConfiguration(TEST_NAME1, config);
+       }
+
+       @Test
+       public void testRightIndexingDense1() {
+               runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, false);
+       }
+
+       @Test
+       public void testRightIndexingSparse1() {
+               runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, true);
+       }
+
+       @Test
+       public void testRightIndexingAlignedDense() {
+               runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, false);
+       }
+
+       @Test
+       public void testRightIndexingAlignedSparse() {
+               runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, true);
+       }
+
+       @Test
+       public void testRightIndexingRowAlignedDense() {
+               runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, false);
+       }
+
+       @Test
+       public void testRightIndexingRowAlignedSparse() {
+               runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, true);
+       }
+
+       @Test
+       public void testRightIndexingSmallDense1() {
+               runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingSmallSparse1() {
+               runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingSmallDense2() {
+               runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingSmallSparse2() {
+               runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingSingleElementDense() {
+               runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingSingleElementSparse() {
+               runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingCrossBlockBothDense() {
+               runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingCrossBlockBothSparse() {
+               runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingSingleRowMultiBlockDense() {
+               runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingSingleRowMultiBlockSparse() {
+               runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingSingleColumnMultiBlockDense() {
+               runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingSingleColumnMultiBlockSparse() {
+               runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, true);
+       }
+
+       @Test
+       public void testRightIndexingTrailingBlocksDense() {
+               runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, false);
+       }
+
+       @Test
+       public void testRightIndexingTrailingBlocksSparse() {
+               runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, true);
+       }
+
+       private void runRightIndexingTest(int rs, int re, int cs, int ce, int 
nrows, int ncols, boolean sparse) {
+               Types.ExecMode platformOld = 
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[] {"-explain", "-stats", 
"-ooc", "-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce, 
output(OUTPUT_NAME)};
+
+                       // 1. Generate the data in-memory as MatrixBlock objects
+                       double[][] X_data = getRandomMatrix(nrows, ncols, 1, 
maxVal, sparse ? sparsity2 : sparsity1, 7);
+
+                       // 2. Convert the double arrays to MatrixBlock objects
+                       MatrixBlock X_mb = 
DataConverter.convertToMatrixBlock(X_data);
+
+                       // 3. Create a binary matrix writer
+                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+                       // 4. Write matrix A to a binary SequenceFile
+                       writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME), 
nrows, ncols, 1000, X_mb.getNonZeros());
+                       HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), 
Types.ValueType.FP64,
+                               new MatrixCharacteristics(nrows, ncols, 1000, 
X_mb.getNonZeros()), Types.FileFormat.BINARY);
+
+                       runTest(true, false, null, -1);
+
+                       //check tsmm OOC
+                       Assert.assertTrue("OOC wasn't used for multiplication",
+                               
heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX));
+
+                       //compare results
+
+                       // rerun without ooc flag
+                       programArgs = new String[] {"-explain", "-stats", 
"-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce, 
output(OUTPUT_NAME + "_target")};
+                       runTest(true, false, null, -1);
+
+                       // compare matrices
+                       int outNRows = re-rs+1;
+                       int outNCols = ce-cs+1;
+
+                       MatrixBlock ret1 = 
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME),
+                               Types.FileFormat.BINARY, outNRows, outNCols, 
1000);
+                       MatrixBlock ret2 = 
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"),
+                               Types.FileFormat.BINARY, outNRows, outNCols, 
1000);
+
+                       //System.out.println(ret1.getNumRows() + "x" + 
ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" + 
ret2.getNumColumns());
+                       /*System.out.println(ret1.slice(998, 1000, 901, 910));
+                       System.out.println(ret2.slice(998, 1000, 901, 910));*/
+                       TestUtils.compareMatrices(ret2, ret1, eps);
+               }
+               catch(IOException e) {
+                       throw new RuntimeException(e);
+               }
+               finally {
+                       resetExecMode(platformOld);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/ooc/RightIndexing.dml 
b/src/test/scripts/functions/ooc/RightIndexing.dml
new file mode 100644
index 0000000000..9abb5c6d93
--- /dev/null
+++ b/src/test/scripts/functions/ooc/RightIndexing.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+rl = $2;
+ru = $3;
+cl = $4;
+cu = $5;
+
+res = X[rl:ru, cl:cu];
+
+write(res, $6, format="binary");
diff --git a/src/test/scripts/functions/ooc/RightIndexingCoords.dml 
b/src/test/scripts/functions/ooc/RightIndexingCoords.dml
new file mode 100644
index 0000000000..61a2ddc747
--- /dev/null
+++ b/src/test/scripts/functions/ooc/RightIndexingCoords.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+rs = $2;
+cs = $3;
+
+res = X[rs, cs];
+
+write(res, $4, format="binary");


Reply via email to