This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 23cc33a [MINOR] Fix fed local write
23cc33a is described below
commit 23cc33ab2f0dfa7cbb0db6b5087a5b255dea6bfa
Author: baunsgaard <[email protected]>
AuthorDate: Thu Oct 29 10:36:53 2020 +0100
[MINOR] Fix fed local write
This commit fixes the federated write, such that it does not
pull the data to master, if a write is called with the federated
file format.
(This commit still does not allow writing federated modified matrices)
---
.../controlprogram/caching/CacheableData.java | 52 ++++++++-------
.../instructions/fed/FEDInstructionUtils.java | 11 ++++
.../instructions/fed/VariableFEDInstruction.java | 74 ++++++++++++++++++++++
.../federated/io/FederatedWriterTest.java | 10 ++-
4 files changed, 121 insertions(+), 26 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index b69596c..e598493 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -457,7 +457,7 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
private synchronized T acquireReadIntern() {
if ( !isAvailableToRead() )
throw new DMLRuntimeException("MatrixObject not
available to read.");
-
+
//get object from cache
if( _data == null )
getCache();
@@ -482,6 +482,7 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
if( _data==null && isEmpty(true) ) {
try {
if( isFederated() ) {
+ LOG.error("Federated pull all data");
_data = readBlobFromFederated(
_fedMapping );
//mark for initial local write despite
read operation
@@ -747,7 +748,6 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
if( LOG.isTraceEnabled() )
LOG.trace("Export data "+hashCode()+" "+fName);
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
//prevent concurrent modifications
if ( !isAvailableToRead() )
throw new DMLRuntimeException("MatrixObject not
available to read.");
@@ -783,29 +783,34 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
{
// CASE 1: dirty in-mem matrix or pWrite w/ different
format (write matrix to fname; load into memory if evicted)
// a) get the matrix
- if( isEmpty(true) )
- {
- //read data from HDFS if required (never read
before), this applies only to pWrite w/ different output formats
- //note: for large rdd outputs, we compile
dedicated writespinstructions (no need to handle this here)
- try
+ boolean federatedWrite =
outputFormat.contains("federated");
+ if( ! federatedWrite){
+
+ if( isEmpty(true))
{
- if( getRDDHandle()==null ||
getRDDHandle().allowsShortCircuitRead() )
- _data = readBlobFromHDFS(
_hdfsFileName );
- else if( getRDDHandle() != null )
- _data = readBlobFromRDD(
getRDDHandle(), new MutableBoolean() );
- else
- _data = readBlobFromFederated(
getFedMapping() );
-
- setDirty(false);
- }
- catch (IOException e) {
- throw new DMLRuntimeException("Reading
of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
+ //read data from HDFS if required
(never read before), this applies only to pWrite w/ different output formats
+ //note: for large rdd outputs, we
compile dedicated writespinstructions (no need to handle this here)
+ try
+ {
+ if( getRDDHandle()==null ||
getRDDHandle().allowsShortCircuitRead() )
+ _data =
readBlobFromHDFS( _hdfsFileName );
+ else if( getRDDHandle() != null
)
+ _data =
readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
+ else {
+ _data =
readBlobFromFederated( getFedMapping() );
+ }
+
+ setDirty(false);
+ }
+ catch (IOException e) {
+ throw new
DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+")
failed.", e);
+ }
}
+ //get object from cache
+ if( _data == null )
+ getCache();
+ acquire( false, _data==null ); //incl. read
matrix if evicted
}
- //get object from cache
- if( _data == null )
- getCache();
- acquire( false, _data==null ); //incl. read matrix if
evicted
// b) write the matrix
try {
@@ -818,7 +823,8 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
throw new DMLRuntimeException("Export to " +
fName + " failed.", e);
}
finally {
- release();
+ if(!federatedWrite)
+ release();
}
}
else if( pWrite ) // pwrite with same output format
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 8ec3432..795db11 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -36,6 +36,7 @@ import
org.apache.sysds.runtime.instructions.cp.MultiReturnParameterizedBuiltinC
import
org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
import org.apache.sysds.runtime.instructions.cp.ReorgCPInstruction;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import
org.apache.sysds.runtime.instructions.cp.VariableCPInstruction.VariableOperationCode;
import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
import org.apache.sysds.runtime.instructions.spark.AppendGAlignedSPInstruction;
import org.apache.sysds.runtime.instructions.spark.AppendGSPInstruction;
@@ -126,6 +127,16 @@ public class FEDInstructionUtils {
if( mo.isFederated() )
fedinst =
ReorgFEDInstruction.parseInstruction(rinst.getInstructionString());
}
+ else if(inst instanceof VariableCPInstruction ){
+ VariableCPInstruction ins = (VariableCPInstruction)
inst;
+
+ if(ins.getVariableOpcode() ==
VariableOperationCode.Write
+ &&
ins.getInput3().getName().contains("federated")){
+ fedinst =
VariableFEDInstruction.parseInstruction(ins);
+ }
+
+ }
+
//set thread id for federated context management
if( fedinst != null ) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
new file mode 100644
index 0000000..b425dee
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import
org.apache.sysds.runtime.instructions.cp.VariableCPInstruction.VariableOperationCode;
+import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.lineage.LineageTraceable;
+
+public class VariableFEDInstruction extends FEDInstruction implements
LineageTraceable {
+ private static final Log LOG =
LogFactory.getLog(VariableFEDInstruction.class.getName());
+
+ private final VariableCPInstruction _in;
+
+ protected VariableFEDInstruction(VariableCPInstruction in) {
+ super(null, in.getOperator(), in.getOpcode(),
in.getInstructionString());
+ _in = in;
+ }
+
+ public static VariableFEDInstruction
parseInstruction(VariableCPInstruction cpInstruction) {
+ return new VariableFEDInstruction(cpInstruction);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec) {
+ VariableOperationCode opcode = _in.getVariableOpcode();
+ switch(opcode) {
+
+ case Write:
+ processWriteInstruction(ec);
+ break;
+
+ default:
+ throw new DMLRuntimeException("Unsupported Opcode for
federated Variable Instruction : " + opcode);
+ }
+ }
+
+ private void processWriteInstruction(ExecutionContext ec) {
+ LOG.error("processing write command federated");
+ // TODO Add write command to the federated site if the matrix has been
modified
+ // this has to be done while appending some string to the federated
output file.
+ // furthermore the outputted file on the federated sites path should
be returned
+ // the controller.
+ _in.processInstruction(ec);
+ }
+
+ @Override
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return _in.getLineageItem(ec);
+ }
+
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index 14dd466..9a913b9 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -21,6 +21,8 @@ package org.apache.sysds.test.functions.federated.io;
import java.util.Arrays;
import java.util.Collection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -36,7 +38,7 @@ import org.junit.runners.Parameterized;
@net.jcip.annotations.NotThreadSafe
public class FederatedWriterTest extends AutomatedTestBase {
- // private static final Log LOG =
LogFactory.getLog(FederatedWriterTest.class.getName());
+ private static final Log LOG =
LogFactory.getLog(FederatedWriterTest.class.getName());
private final static String TEST_DIR = "functions/federated/";
private final static String TEST_NAME = "FederatedWriterTest";
private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedWriterTest.class.getSimpleName() + "/";
@@ -96,8 +98,10 @@ public class FederatedWriterTest extends AutomatedTestBase {
// Run reader and write a federated json to enable the rest of the
test
fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/FederatedReaderTestCreate.dml";
programArgs = new String[] {"-stats", "-explain","-args",
input("X1"), input("X2"), port1 + "", port2 + "", input("X.json")};
- // String writer = runTest(null).toString();
- runTest(null);
+ String writer = runTest(null).toString();
+ // runTest(null);
+ LOG.error(writer);
+ LOG.error("Writing Done");
// Run reference dml script with normal matrix
fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/FederatedReaderTest.dml";