mboehm7 commented on code in PR #2317:
URL: https://github.com/apache/systemds/pull/2317#discussion_r2309559493


##########
src/main/java/org/apache/sysds/hops/DataOp.java:
##########
@@ -60,6 +56,8 @@ public class DataOp extends Hop {
        
        private boolean _recompileRead = true;
 
+       private boolean _isTeeOp = false;

Review Comment:
   I would rather make it a op-type of DataOp like TWRITE.



##########
src/main/java/org/apache/sysds/hops/DataOp.java:
##########
@@ -251,56 +253,66 @@ public Lop constructLops()
 
                ExecType et = optFindExecType();
                Lop l = null;
-               
-               // construct lops for all input parameters
-               HashMap<String, Lop> inputLops = new HashMap<>();
-               for (Entry<String, Integer> cur : _paramIndexMap.entrySet()) {
-                       inputLops.put(cur.getKey(), 
getInput().get(cur.getValue()).constructLops());
+
+               if (_isTeeOp) {
+                       Tee teeLop = new Tee(getInput().get(0).constructLops(),
+                                                       getDataType(), 
getValueType());
+                       setLineNumbers(teeLop);
+                       setLops(teeLop);
+                       setOutputDimensions(teeLop);
                }
+               else {
 
-               // Create the lop
-               switch(_op) 
-               {
-                       case TRANSIENTREAD:
-                               l = new Data(_op, null, inputLops, getName(), 
null, 
-                                               getDataType(), getValueType(), 
getFileFormat());
-                               setOutputDimensions(l);
-                               break;
-                               
-                       case PERSISTENTREAD:
-                               l = new Data(_op, null, inputLops, getName(), 
null, 
-                                               getDataType(), getValueType(), 
getFileFormat());
-                               
l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inBlocksize, 
getNnz(), getUpdateType());
-                               break;
-                               
-                       case PERSISTENTWRITE:
-                       case FUNCTIONOUTPUT:
-                               l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null, 
-                                       getDataType(), getValueType(), 
getFileFormat());
-                               ((Data)l).setExecType(et);
-                               setOutputDimensions(l);
-                               break;
-                               
-                       case TRANSIENTWRITE:
-                               l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null,
-                                               getDataType(), getValueType(), 
getFileFormat());
-                               setOutputDimensions(l);
-                               break;
-                               
-                       case SQLREAD:
-                               l = new Sql(inputLops, getDataType(), 
getValueType());
-                               break;
-                       
-                       case FEDERATED:
-                               l = new Federated(inputLops, getDataType(), 
getValueType());
-                               break;
-                       
-                       default:
-                               throw new LopsException("Invalid operation type 
for Data LOP: " + _op);
+                       // construct lops for all input parameters
+                       HashMap<String, Lop> inputLops = new HashMap<>();
+                       for (Entry<String, Integer> cur : 
_paramIndexMap.entrySet()) {
+                               inputLops.put(cur.getKey(), 
getInput().get(cur.getValue()).constructLops());
+                       }
+
+                       // Create the lop
+                       switch (_op) {
+                               case TRANSIENTREAD:
+                                       l = new Data(_op, null, inputLops, 
getName(), null,
+                                                                       
getDataType(), getValueType(), getFileFormat());
+                                       setOutputDimensions(l);
+                                       break;
+
+                               case PERSISTENTREAD:
+                                       l = new Data(_op, null, inputLops, 
getName(), null,
+                                                                       
getDataType(), getValueType(), getFileFormat());
+                                       
l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inBlocksize, 
getNnz(), getUpdateType());
+                                       break;
+
+                               case PERSISTENTWRITE:
+                               case FUNCTIONOUTPUT:
+                                       l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null,
+                                                                       
getDataType(), getValueType(), getFileFormat());
+                                       ((Data) l).setExecType(et);
+                                       setOutputDimensions(l);
+                                       break;
+
+                               case TRANSIENTWRITE:
+                                       l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null,
+                                                                       
getDataType(), getValueType(), getFileFormat());
+                                       setOutputDimensions(l);
+                                       break;
+
+                               case SQLREAD:
+                                       l = new Sql(inputLops, getDataType(), 
getValueType());
+                                       break;
+
+                               case FEDERATED:
+                                       l = new Federated(inputLops, 
getDataType(), getValueType());
+                                       break;
+
+                               default:
+                                       throw new LopsException("Invalid 
operation type for Data LOP: " + _op);
+                       }
+                       setLineNumbers(l);
+                       setLops(l);
                }
-               
-               setLineNumbers(l);
-               setLops(l);
+//             setLineNumbers(l);
+//             setLops(l);

Review Comment:
   the line numbers and lops should also be set for the TeeOp



##########
src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectOOCTee.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.hops.rewrite;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.AggBinaryOp;
+import org.apache.sysds.hops.DataOp;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.ReorgOp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This Rewrite rule injects a Tee Operator for specific Out-Of-Core (OOC) 
patterns
+ * where a value or an intermediate result is shared twice. Since for OOC we 
data streams
+ * can only be consumed once.
+ *
+ * <p>
+ *   Pattern identified {@code t(X) %*% X}, where the data {@code X} will be 
shared by
+ *   {@code t(X)} and {@code %*%} multiplication.
+ * </p>
+ *
+ * The rewrite uses a stable two-pass approach:
+ * 1. <b>Find candidates (Read-Only):</b> Traverse the entire HOP DAG to 
identify candidates
+ * the fit the target pattern.
+ * 2. <b>Apply Rewrites (Modification):</b> Iterate over the collected 
candidate and put
+ * {@code TeeOp}, and safely rewire the graph.
+ */
+public class RewriteInjectOOCTee extends HopRewriteRule {
+
+       private static final Set<Long> rewrittenHops = new HashSet<>();
+       private static final Map<Long, Hop> handledHop = new HashMap<>();
+
+       // Maintain a list of candidates to rewrite in the second pass
+       private final List<Hop> rewriteCandidates = new ArrayList<>();
+
+       /**
+        * Handle a generic (last-level) hop DAG with multiple roots.
+        *
+        * @param roots high-level operator roots
+        * @param state program rewrite status
+        * @return list of high-level operators
+        */
+       @Override
+       public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state) {
+               if (roots == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               for (Hop root : roots) {
+                       root.resetVisitStatus();
+                       findRewriteCandidates(root);
+               }
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return roots;
+       }
+
+       /**
+        * Handle a predicate hop DAG with exactly one root.
+        *
+        * @param root  high-level operator root
+        * @param state program rewrite status
+        * @return high-level operator
+        */
+       @Override
+       public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) {
+               if (root == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               root.resetVisitStatus();
+               findRewriteCandidates(root);
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return root;
+       }
+
+       /**
+        * First pass: Find candidates for rewrite without modifying the graph.
+        * This method traverses the graph and identifies nodes that need to be
+        * rewritten based on the transpose-matrix multiply pattern.
+        *
+        * @param hop current hop being examined
+        */
+       private void findRewriteCandidates(Hop hop) {
+               if (hop.isVisited()) {
+                       return;
+               }
+
+               // Mark as visited to avoid processing the same hop multiple 
times
+               hop.setVisited(true);
+
+               // Recursively traverse the graph (depth-first)
+               for (Hop input : hop.getInput()) {
+                       findRewriteCandidates(input);
+               }
+
+               // Check if this hop is a candidate for OOC Tee injection
+               if (isRewriteCandidate(hop)) {
+                       rewriteCandidates.add(hop);
+               }
+       }
+
+       /**
+        * Check if a hop should be considered for rewrite.
+        *
+        * @param hop the hop to check
+        * @return true if the hop meets all criteria for rewrite
+        */
+       private boolean isRewriteCandidate(Hop hop) {
+               // Skip if already handled
+               if (rewrittenHops.contains(hop.getHopID()) || 
handledHop.containsKey(hop.getHopID())) {
+                       return false;
+               }
+
+               boolean multipleConsumers = hop.getParent().size() > 1;
+               boolean isNotAlreadyTee = isNotAlreadyTee(hop);
+               boolean isOOCEnabled = DMLScript.USE_OOC;
+               boolean isTransposeMM = isTranposePattern(hop);
+               boolean isMatrix = hop.getDataType() == Types.DataType.MATRIX;
+
+               return isOOCEnabled && multipleConsumers && isNotAlreadyTee && 
isTransposeMM && isMatrix;
+       }
+
+       /**
+        * Second pass: Apply the TeeOp transformation to a candidate hop.
+        * This safely rewires the graph by creating a TeeOp node and 
placeholders.
+        *
+        * @param sharedInput the hop to be rewritten
+        */
+       private void applyTopDownTeeRewrite(Hop sharedInput) {
+               // Only process if not already handled
+               if (handledHop.containsKey(sharedInput.getHopID())) {
+                       return;
+               }
+
+               // Take a defensive copy of consumers before modifying the graph
+               ArrayList<Hop> consumers = new 
ArrayList<>(sharedInput.getParent());
+
+               // Create the new TeeOp with the original hop as input
+//             TeeOp teeOp = new TeeOp(sharedInput);
+               DataOp teeOp    = new DataOp("tee_out_" + sharedInput.getName(),
+                                               sharedInput.getDataType(),
+                                               sharedInput.getValueType(),
+                                               Types.OpOpData.TRANSIENTWRITE,
+                                               null,
+                                               sharedInput.getDim1(),
+                                               sharedInput.getDim2(),
+                                               sharedInput.getNnz(),
+                                               sharedInput.getBlocksize()
+               );
+
+               teeOp.setIsTeeOp(true);
+               HopRewriteUtils.addChildReference(teeOp, sharedInput);
+
+               // Rewire the graph: replace original connections with TeeOp 
outputs
+               int i = 0;
+               for (Hop consumer : consumers) {
+                       Hop placeholder = new DataOp("tee_out_" + 
sharedInput.getName() + "_" + i,
+                                                       
sharedInput.getDataType(),
+                                                       
sharedInput.getValueType(),
+                                                       
Types.OpOpData.TRANSIENTWRITE,
+                                                       null,
+                                                       sharedInput.getDim1(),
+                                                       sharedInput.getDim2(),
+                                                       sharedInput.getNnz(),
+                                                       
sharedInput.getBlocksize()
+                       );
+
+                       // Copy metadata
+                       placeholder.setBeginLine(sharedInput.getBeginLine());
+                       
placeholder.setBeginColumn(sharedInput.getBeginColumn());
+                       placeholder.setEndLine(sharedInput.getEndLine());
+                       placeholder.setEndColumn(sharedInput.getEndColumn());
+
+                       // Connect placeholder to TeeOp and consumer
+                       HopRewriteUtils.addChildReference(placeholder, teeOp);
+                       HopRewriteUtils.replaceChildReference(consumer, 
sharedInput, placeholder);
+
+                       i++;
+               }
+
+               // Record that we've handled this hop
+               handledHop.put(sharedInput.getHopID(), teeOp);
+               rewrittenHops.add(sharedInput.getHopID());
+       }
+
+       private boolean isNotAlreadyTee(Hop hop) {
+               if (hop.getParent().size() > 1) {
+                       for (Hop consumer : hop.getParent()) {
+                               if (consumer instanceof DataOp) {
+                                       return false;
+                               }
+                       }
+               }
+               return true;
+       }
+
+       private boolean isTranposePattern (Hop hop) {
+               boolean hasTransposeConsumer = false; // t(X)
+               boolean hasMatrixMultiplyConsumer = false; // %*%
+
+               for (Hop parent: hop.getParent()) {
+                       String opString = parent.getOpString();
+                       if (parent instanceof ReorgOp) {
+                               if (opString.contains("r'") || 
opString.contains("transpose")) {
+                                       hasTransposeConsumer = true;
+                               }
+                       }
+                       else if (parent instanceof AggBinaryOp)
+                               if (opString.contains("*") || 
opString.contains("ba+*")) {

Review Comment:
   same here as above.



##########
src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectOOCTee.java:
##########
@@ -175,7 +178,20 @@ private void applyTopDownTeeRewrite(Hop sharedInput) {
                ArrayList<Hop> consumers = new 
ArrayList<>(sharedInput.getParent());
 
                // Create the new TeeOp with the original hop as input
-               TeeOp teeOp = new TeeOp(sharedInput);
+//             TeeOp teeOp = new TeeOp(sharedInput);
+               DataOp teeOp    = new DataOp("tee_out_" + sharedInput.getName(),

Review Comment:
   no worries, these hop names are just for debugging - while generating 
instructions, we generate the _mvar, _fvar, _var accordingly.



##########
src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectOOCTee.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.hops.rewrite;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.AggBinaryOp;
+import org.apache.sysds.hops.DataOp;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.ReorgOp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This Rewrite rule injects a Tee Operator for specific Out-Of-Core (OOC) 
patterns
+ * where a value or an intermediate result is shared twice. Since for OOC we 
data streams
+ * can only be consumed once.
+ *
+ * <p>
+ *   Pattern identified {@code t(X) %*% X}, where the data {@code X} will be 
shared by
+ *   {@code t(X)} and {@code %*%} multiplication.
+ * </p>
+ *
+ * The rewrite uses a stable two-pass approach:
+ * 1. <b>Find candidates (Read-Only):</b> Traverse the entire HOP DAG to 
identify candidates
+ * the fit the target pattern.
+ * 2. <b>Apply Rewrites (Modification):</b> Iterate over the collected 
candidate and put
+ * {@code TeeOp}, and safely rewire the graph.
+ */
+public class RewriteInjectOOCTee extends HopRewriteRule {
+
+       private static final Set<Long> rewrittenHops = new HashSet<>();
+       private static final Map<Long, Hop> handledHop = new HashMap<>();
+
+       // Maintain a list of candidates to rewrite in the second pass
+       private final List<Hop> rewriteCandidates = new ArrayList<>();
+
+       /**
+        * Handle a generic (last-level) hop DAG with multiple roots.
+        *
+        * @param roots high-level operator roots
+        * @param state program rewrite status
+        * @return list of high-level operators
+        */
+       @Override
+       public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state) {
+               if (roots == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               for (Hop root : roots) {
+                       root.resetVisitStatus();
+                       findRewriteCandidates(root);
+               }
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return roots;
+       }
+
+       /**
+        * Handle a predicate hop DAG with exactly one root.
+        *
+        * @param root  high-level operator root
+        * @param state program rewrite status
+        * @return high-level operator
+        */
+       @Override
+       public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) {
+               if (root == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               root.resetVisitStatus();
+               findRewriteCandidates(root);
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return root;
+       }
+
+       /**
+        * First pass: Find candidates for rewrite without modifying the graph.
+        * This method traverses the graph and identifies nodes that need to be
+        * rewritten based on the transpose-matrix multiply pattern.
+        *
+        * @param hop current hop being examined
+        */
+       private void findRewriteCandidates(Hop hop) {
+               if (hop.isVisited()) {
+                       return;
+               }
+
+               // Mark as visited to avoid processing the same hop multiple 
times
+               hop.setVisited(true);
+
+               // Recursively traverse the graph (depth-first)
+               for (Hop input : hop.getInput()) {
+                       findRewriteCandidates(input);
+               }
+
+               // Check if this hop is a candidate for OOC Tee injection
+               if (isRewriteCandidate(hop)) {
+                       rewriteCandidates.add(hop);
+               }
+       }
+
+       /**
+        * Check if a hop should be considered for rewrite.
+        *
+        * @param hop the hop to check
+        * @return true if the hop meets all criteria for rewrite
+        */
+       private boolean isRewriteCandidate(Hop hop) {
+               // Skip if already handled
+               if (rewrittenHops.contains(hop.getHopID()) || 
handledHop.containsKey(hop.getHopID())) {
+                       return false;
+               }
+
+               boolean multipleConsumers = hop.getParent().size() > 1;
+               boolean isNotAlreadyTee = isNotAlreadyTee(hop);
+               boolean isOOCEnabled = DMLScript.USE_OOC;
+               boolean isTransposeMM = isTranposePattern(hop);
+               boolean isMatrix = hop.getDataType() == Types.DataType.MATRIX;
+
+               return isOOCEnabled && multipleConsumers && isNotAlreadyTee && 
isTransposeMM && isMatrix;
+       }
+
+       /**
+        * Second pass: Apply the TeeOp transformation to a candidate hop.
+        * This safely rewires the graph by creating a TeeOp node and 
placeholders.
+        *
+        * @param sharedInput the hop to be rewritten
+        */
+       private void applyTopDownTeeRewrite(Hop sharedInput) {
+               // Only process if not already handled
+               if (handledHop.containsKey(sharedInput.getHopID())) {
+                       return;
+               }
+
+               // Take a defensive copy of consumers before modifying the graph
+               ArrayList<Hop> consumers = new 
ArrayList<>(sharedInput.getParent());
+
+               // Create the new TeeOp with the original hop as input
+//             TeeOp teeOp = new TeeOp(sharedInput);
+               DataOp teeOp    = new DataOp("tee_out_" + sharedInput.getName(),
+                                               sharedInput.getDataType(),
+                                               sharedInput.getValueType(),
+                                               Types.OpOpData.TRANSIENTWRITE,
+                                               null,
+                                               sharedInput.getDim1(),
+                                               sharedInput.getDim2(),
+                                               sharedInput.getNnz(),
+                                               sharedInput.getBlocksize()
+               );
+
+               teeOp.setIsTeeOp(true);
+               HopRewriteUtils.addChildReference(teeOp, sharedInput);
+
+               // Rewire the graph: replace original connections with TeeOp 
outputs
+               int i = 0;
+               for (Hop consumer : consumers) {
+                       Hop placeholder = new DataOp("tee_out_" + 
sharedInput.getName() + "_" + i,
+                                                       
sharedInput.getDataType(),
+                                                       
sharedInput.getValueType(),
+                                                       
Types.OpOpData.TRANSIENTWRITE,
+                                                       null,
+                                                       sharedInput.getDim1(),
+                                                       sharedInput.getDim2(),
+                                                       sharedInput.getNnz(),
+                                                       
sharedInput.getBlocksize()
+                       );
+
+                       // Copy metadata
+                       placeholder.setBeginLine(sharedInput.getBeginLine());
+                       
placeholder.setBeginColumn(sharedInput.getBeginColumn());
+                       placeholder.setEndLine(sharedInput.getEndLine());
+                       placeholder.setEndColumn(sharedInput.getEndColumn());
+
+                       // Connect placeholder to TeeOp and consumer
+                       HopRewriteUtils.addChildReference(placeholder, teeOp);
+                       HopRewriteUtils.replaceChildReference(consumer, 
sharedInput, placeholder);
+
+                       i++;
+               }
+
+               // Record that we've handled this hop
+               handledHop.put(sharedInput.getHopID(), teeOp);
+               rewrittenHops.add(sharedInput.getHopID());
+       }
+
+       private boolean isNotAlreadyTee(Hop hop) {
+               if (hop.getParent().size() > 1) {
+                       for (Hop consumer : hop.getParent()) {
+                               if (consumer instanceof DataOp) {
+                                       return false;
+                               }
+                       }
+               }
+               return true;
+       }
+
+       private boolean isTranposePattern (Hop hop) {
+               boolean hasTransposeConsumer = false; // t(X)
+               boolean hasMatrixMultiplyConsumer = false; // %*%
+
+               for (Hop parent: hop.getParent()) {
+                       String opString = parent.getOpString();
+                       if (parent instanceof ReorgOp) {
+                               if (opString.contains("r'") || 
opString.contains("transpose")) {

Review Comment:
   please always use the following utils: `HopRewriteUtils.isReorgOp(parent, 
ReOrgOp.TRANS)`



##########
src/main/java/org/apache/sysds/lops/Tee.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.lops;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+
+public class Tee extends Lop {
+
+       public static final String OPCODE = "tee";
+       /**
+        * Constructor to be invoked by base class.
+        *
+        * @param input1  lop type
+        * @param dt data type of the output
+        * @param vt value type of the output
+        */
+       public Tee(Lop input1, DataType dt, ValueType vt) {
+               super(Lop.Type.Tee, dt, vt);
+               this.addInput(input1);
+               input1.addOutput(this);
+               lps.setProperties(inputs, Types.ExecType.OOC);
+       }
+
+       @Override
+       public String toString() {
+               return "Operation = Tee";
+       }
+
+       @Override
+       public String getInstructions(String input1, String outputs) {
+
+               String[] out = outputs.split(Lop.OPERAND_DELIMITOR);
+               String output2 = outputs + "_copy";
+
+               // This method generates the instruction string: 
OOC°tee°input°output1°output2...

Review Comment:
   I would recommend not making tee an operation with two outputs but just one. 
(copying the input). After thinking a bit more about it, maybe the entire 
notion of a tee was not a good idea. Potentially, we could make the stream 
resettable, and the tee operator takes a normal stream, and outputs a 
resettable stream which can then be consumed by arbitrary many other operations 
(which also solves the problem of complex control flow interactions where the 
multiple parents are not immediately visible)  



##########
src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectOOCTee.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.hops.rewrite;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.AggBinaryOp;
+import org.apache.sysds.hops.DataOp;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.ReorgOp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This Rewrite rule injects a Tee Operator for specific Out-Of-Core (OOC) 
patterns
+ * where a value or an intermediate result is shared twice. Since for OOC we 
data streams
+ * can only be consumed once.
+ *
+ * <p>
+ *   Pattern identified {@code t(X) %*% X}, where the data {@code X} will be 
shared by
+ *   {@code t(X)} and {@code %*%} multiplication.
+ * </p>
+ *
+ * The rewrite uses a stable two-pass approach:
+ * 1. <b>Find candidates (Read-Only):</b> Traverse the entire HOP DAG to 
identify candidates
+ * the fit the target pattern.
+ * 2. <b>Apply Rewrites (Modification):</b> Iterate over the collected 
candidate and put
+ * {@code TeeOp}, and safely rewire the graph.
+ */
+public class RewriteInjectOOCTee extends HopRewriteRule {
+
+       private static final Set<Long> rewrittenHops = new HashSet<>();
+       private static final Map<Long, Hop> handledHop = new HashMap<>();
+
+       // Maintain a list of candidates to rewrite in the second pass
+       private final List<Hop> rewriteCandidates = new ArrayList<>();
+
+       /**
+        * Handle a generic (last-level) hop DAG with multiple roots.
+        *
+        * @param roots high-level operator roots
+        * @param state program rewrite status
+        * @return list of high-level operators
+        */
+       @Override
+       public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state) {
+               if (roots == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               for (Hop root : roots) {
+                       root.resetVisitStatus();
+                       findRewriteCandidates(root);
+               }
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return roots;
+       }
+
+       /**
+        * Handle a predicate hop DAG with exactly one root.
+        *
+        * @param root  high-level operator root
+        * @param state program rewrite status
+        * @return high-level operator
+        */
+       @Override
+       public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) {
+               if (root == null) {
+                       return null;
+               }
+
+               // Clear candidates for this pass
+               rewriteCandidates.clear();
+
+               // PASS 1: Identify candidates without modifying the graph
+               root.resetVisitStatus();
+               findRewriteCandidates(root);
+
+               // PASS 2: Apply rewrites to identified candidates
+               for (Hop candidate : rewriteCandidates) {
+                       applyTopDownTeeRewrite(candidate);
+               }
+
+               return root;
+       }
+
+       /**
+        * First pass: Find candidates for rewrite without modifying the graph.
+        * This method traverses the graph and identifies nodes that need to be
+        * rewritten based on the transpose-matrix multiply pattern.
+        *
+        * @param hop current hop being examined
+        */
+       private void findRewriteCandidates(Hop hop) {
+               if (hop.isVisited()) {
+                       return;
+               }
+
+               // Mark as visited to avoid processing the same hop multiple 
times
+               hop.setVisited(true);
+
+               // Recursively traverse the graph (depth-first)
+               for (Hop input : hop.getInput()) {
+                       findRewriteCandidates(input);
+               }
+
+               // Check if this hop is a candidate for OOC Tee injection
+               if (isRewriteCandidate(hop)) {
+                       rewriteCandidates.add(hop);
+               }
+       }
+
+       /**
+        * Check if a hop should be considered for rewrite.
+        *
+        * @param hop the hop to check
+        * @return true if the hop meets all criteria for rewrite
+        */
+       private boolean isRewriteCandidate(Hop hop) {
+               // Skip if already handled
+               if (rewrittenHops.contains(hop.getHopID()) || 
handledHop.containsKey(hop.getHopID())) {
+                       return false;
+               }
+
+               boolean multipleConsumers = hop.getParent().size() > 1;
+               boolean isNotAlreadyTee = isNotAlreadyTee(hop);
+               boolean isOOCEnabled = DMLScript.USE_OOC;
+               boolean isTransposeMM = isTranposePattern(hop);
+               boolean isMatrix = hop.getDataType() == Types.DataType.MATRIX;
+
+               return isOOCEnabled && multipleConsumers && isNotAlreadyTee && 
isTransposeMM && isMatrix;

Review Comment:
   it's fine to currently only do it for transpose - it will not affect other 
things, and later we generalize the approach.



##########
src/main/java/org/apache/sysds/hops/DataOp.java:
##########
@@ -32,12 +32,8 @@
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.rewrite.HopRewriteUtils;
-import org.apache.sysds.lops.Data;
-import org.apache.sysds.lops.Federated;
-import org.apache.sysds.lops.Lop;
+import org.apache.sysds.lops.*;
 import org.apache.sysds.common.Types.ExecType;
-import org.apache.sysds.lops.LopsException;
-import org.apache.sysds.lops.Sql;

Review Comment:
   yes exactly - let's try to avoid the wildcard imports (to avoid ambiguity).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to