This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ff6274788 [minor] Compression SP Schema apply
5ff6274788 is described below

commit 5ff6274788bf4f5c9b42c2e43494c7f99451571c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed Feb 5 12:28:07 2025 +0100

    [minor] Compression SP Schema apply
    
    Closes #2215
---
 .../runtime/instructions/SPInstructionParser.java  |  2 ++
 .../spark/BinaryFrameFrameSPInstruction.java       | 25 +++++++++++++++++++++-
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index 5c72b85436..5014c0ac30 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -39,6 +39,7 @@ import org.apache.sysds.lops.WeightedSquaredLossR;
 import org.apache.sysds.lops.WeightedUnaryMM;
 import org.apache.sysds.lops.WeightedUnaryMMR;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysds.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
@@ -195,6 +196,7 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "freplicate", SPType.Binary);
                String2SPInstructionType.put( "mapdropInvalidLength", 
SPType.Binary);
                String2SPInstructionType.put( "valueSwap", SPType.Binary);
+               String2SPInstructionType.put( "applySchema"  , SPType.Binary);
                String2SPInstructionType.put( "_map", SPType.Ternary); // _map 
refers to the operation map
                // Relational Instruction Opcodes
                String2SPInstructionType.put( "=="   , SPType.Binary);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
index 6f6232e71a..dfad7a165e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -59,6 +59,11 @@ public class BinaryFrameFrameSPInstruction extends 
BinarySPInstruction {
                        // Attach result frame with FrameBlock associated with 
output_name
                        sec.releaseFrameInput(input2.getName());
                }
+               else if(getOpcode().equals("applySchema")){
+                       Broadcast<FrameBlock> fb = 
sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName()));
+                       out = in1.mapValues(new applySchema(fb.getValue()));
+                       sec.releaseFrameInput(input2.getName());
+               }
                else {
                        JavaPairRDD<Long, FrameBlock> in2 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input2.getName());
                        // create output frame
@@ -70,7 +75,9 @@ public class BinaryFrameFrameSPInstruction extends 
BinarySPInstruction {
                //set output RDD and maintain dependencies
                sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
-               if( !getOpcode().equals("dropInvalidType")  && 
!getOpcode().equals("valueSwap"))
+               if(!getOpcode().equals("dropInvalidType") && //
+                       !getOpcode().equals("valueSwap") && //
+                       !getOpcode().equals("applySchema"))
                        sec.addLineageRDD(output.getName(), input2.getName());
        }
 
@@ -116,4 +123,20 @@ public class BinaryFrameFrameSPInstruction extends 
BinarySPInstruction {
                        return arg0.valueSwap(schema_frame);
                }
        }
+
+
+       private static class applySchema implements Function<FrameBlock, 
FrameBlock>{
+               private static final long serialVersionUID = 58504021316402L;
+
+               private FrameBlock schema;
+
+               public applySchema(FrameBlock schema ) {
+                       this.schema = schema;
+               }
+
+               @Override
+               public FrameBlock call(FrameBlock arg0) throws Exception {
+                       return arg0.applySchema(schema);
+               }
+       }
 }

Reply via email to