This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 23cc33a  [MINOR] Fix fed local write
23cc33a is described below

commit 23cc33ab2f0dfa7cbb0db6b5087a5b255dea6bfa
Author: baunsgaard <[email protected]>
AuthorDate: Thu Oct 29 10:36:53 2020 +0100

    [MINOR] Fix fed local write
    
    This commit fixes the federated write, such that it does not
    pull the data to master, if a write is called with the federated
    file format.
    
    (This commit still does not allow writing federated modified matrices)
---
 .../controlprogram/caching/CacheableData.java      | 52 ++++++++-------
 .../instructions/fed/FEDInstructionUtils.java      | 11 ++++
 .../instructions/fed/VariableFEDInstruction.java   | 74 ++++++++++++++++++++++
 .../federated/io/FederatedWriterTest.java          | 10 ++-
 4 files changed, 121 insertions(+), 26 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index b69596c..e598493 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -457,7 +457,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        private synchronized T acquireReadIntern() {
                if ( !isAvailableToRead() )
                        throw new DMLRuntimeException("MatrixObject not 
available to read.");
-               
+
                //get object from cache
                if( _data == null )
                        getCache();
@@ -482,6 +482,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                if( _data==null && isEmpty(true) ) {
                        try {
                                if( isFederated() ) {
+                                       LOG.error("Federated pull all data");
                                        _data = readBlobFromFederated( 
_fedMapping );
                                        
                                        //mark for initial local write despite 
read operation
@@ -747,7 +748,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                if( LOG.isTraceEnabled() )
                        LOG.trace("Export data "+hashCode()+" "+fName);
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               
                //prevent concurrent modifications
                if ( !isAvailableToRead() )
                        throw new DMLRuntimeException("MatrixObject not 
available to read.");
@@ -783,29 +783,34 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                {
                        // CASE 1: dirty in-mem matrix or pWrite w/ different 
format (write matrix to fname; load into memory if evicted)
                        // a) get the matrix
-                       if( isEmpty(true) )
-                       {
-                               //read data from HDFS if required (never read 
before), this applies only to pWrite w/ different output formats
-                               //note: for large rdd outputs, we compile 
dedicated writespinstructions (no need to handle this here) 
-                               try
+                       boolean federatedWrite = 
outputFormat.contains("federated");
+                       if( ! federatedWrite){
+
+                               if( isEmpty(true))
                                {
-                                       if( getRDDHandle()==null || 
getRDDHandle().allowsShortCircuitRead() )
-                                               _data = readBlobFromHDFS( 
_hdfsFileName );
-                                       else if( getRDDHandle() != null )
-                                               _data = readBlobFromRDD( 
getRDDHandle(), new MutableBoolean() );
-                                       else 
-                                               _data = readBlobFromFederated( 
getFedMapping() );
-                                       
-                                       setDirty(false);
-                               }
-                               catch (IOException e) {
-                                       throw new DMLRuntimeException("Reading 
of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
+                                       //read data from HDFS if required 
(never read before), this applies only to pWrite w/ different output formats
+                                       //note: for large rdd outputs, we 
compile dedicated writespinstructions (no need to handle this here) 
+                                       try
+                                       {
+                                               if( getRDDHandle()==null || 
getRDDHandle().allowsShortCircuitRead() )
+                                                       _data = 
readBlobFromHDFS( _hdfsFileName );
+                                               else if( getRDDHandle() != null 
)
+                                                       _data = 
readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
+                                               else {
+                                                       _data = 
readBlobFromFederated( getFedMapping() );
+                                               }
+                                               
+                                               setDirty(false);
+                                       }
+                                       catch (IOException e) {
+                                               throw new 
DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") 
failed.", e);
+                                       }
                                }
+                               //get object from cache
+                               if( _data == null )
+                                       getCache();
+                               acquire( false, _data==null ); //incl. read 
matrix if evicted
                        }
-                       //get object from cache
-                       if( _data == null )
-                               getCache();
-                       acquire( false, _data==null ); //incl. read matrix if 
evicted
                        
                        // b) write the matrix 
                        try {
@@ -818,7 +823,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                throw new DMLRuntimeException("Export to " + 
fName + " failed.", e);
                        }
                        finally {
-                               release();
+                               if(!federatedWrite)
+                                       release();
                        }
                }
                else if( pWrite ) // pwrite with same output format
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 8ec3432..795db11 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -36,6 +36,7 @@ import 
org.apache.sysds.runtime.instructions.cp.MultiReturnParameterizedBuiltinC
 import 
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ReorgCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import 
org.apache.sysds.runtime.instructions.cp.VariableCPInstruction.VariableOperationCode;
 import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysds.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.AppendGSPInstruction;
@@ -126,6 +127,16 @@ public class FEDInstructionUtils {
                        if( mo.isFederated() )
                                fedinst = 
ReorgFEDInstruction.parseInstruction(rinst.getInstructionString());
                }
+               else if(inst instanceof VariableCPInstruction ){
+                       VariableCPInstruction ins = (VariableCPInstruction) 
inst;
+
+                       if(ins.getVariableOpcode() == 
VariableOperationCode.Write 
+                               && 
ins.getInput3().getName().contains("federated")){
+                               fedinst = 
VariableFEDInstruction.parseInstruction(ins);
+                       }
+
+               }
+
                
                //set thread id for federated context management
                if( fedinst != null ) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
new file mode 100644
index 0000000..b425dee
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import 
org.apache.sysds.runtime.instructions.cp.VariableCPInstruction.VariableOperationCode;
+import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.lineage.LineageTraceable;
+
+public class VariableFEDInstruction extends FEDInstruction implements 
LineageTraceable {
+    private static final Log LOG = 
LogFactory.getLog(VariableFEDInstruction.class.getName());
+
+    private final VariableCPInstruction _in;
+
+    protected VariableFEDInstruction(VariableCPInstruction in) {
+        super(null, in.getOperator(), in.getOpcode(), 
in.getInstructionString());
+        _in = in;
+    }
+
+    public static VariableFEDInstruction 
parseInstruction(VariableCPInstruction cpInstruction) {
+        return new VariableFEDInstruction(cpInstruction);
+    }
+
+    @Override
+    public void processInstruction(ExecutionContext ec) {
+        VariableOperationCode opcode = _in.getVariableOpcode();
+        switch(opcode) {
+
+            case Write:
+                processWriteInstruction(ec);
+                break;
+
+            default:
+                throw new DMLRuntimeException("Unsupported Opcode for 
federated Variable Instruction : " + opcode);
+        }
+    }
+
+    private void processWriteInstruction(ExecutionContext ec) {
+        LOG.error("processing write command federated");
+        // TODO Add write command to the federated site if the matrix has been 
modified
+        // this has to be done while appending some string to the federated 
output file.
+        // furthermore the outputted file on the federated sites path should 
be returned
+        // the controller.
+        _in.processInstruction(ec);
+    }
+
+    @Override
+    public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+        return _in.getLineageItem(ec);
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index 14dd466..9a913b9 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -21,6 +21,8 @@ package org.apache.sysds.test.functions.federated.io;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -36,7 +38,7 @@ import org.junit.runners.Parameterized;
 @net.jcip.annotations.NotThreadSafe
 public class FederatedWriterTest extends AutomatedTestBase {
 
-    // private static final Log LOG = 
LogFactory.getLog(FederatedWriterTest.class.getName());
+    private static final Log LOG = 
LogFactory.getLog(FederatedWriterTest.class.getName());
     private final static String TEST_DIR = "functions/federated/";
     private final static String TEST_NAME = "FederatedWriterTest";
     private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedWriterTest.class.getSimpleName() + "/";
@@ -96,8 +98,10 @@ public class FederatedWriterTest extends AutomatedTestBase {
             // Run reader and write a federated json to enable the rest of the 
test
             fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/FederatedReaderTestCreate.dml";
             programArgs = new String[] {"-stats", "-explain","-args", 
input("X1"), input("X2"), port1 + "", port2 + "", input("X.json")};
-            // String writer = runTest(null).toString();
-            runTest(null);
+            String writer = runTest(null).toString();
+            // runTest(null);
+            LOG.error(writer);
+            LOG.error("Writing Done");
 
             // Run reference dml script with normal matrix
             fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/FederatedReaderTest.dml";

Reply via email to