This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 6272b0eb64 [SYSTEMDS-3814] Fix invalid rename of csv input to output 
files
6272b0eb64 is described below

commit 6272b0eb6483d65fb8c02fdfd1871fbcce3b731b
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jan 17 19:48:47 2025 +0100

    [SYSTEMDS-3814] Fix invalid rename of csv input to output files
    
    This patch fixes a remaining invalid rename of persistently read input
    csv files to csv output files, which "deletes" the input file. So far
    we based this information on the PREAD variable name, but certain
    assignments loose this information. We now properly capture this
    information at createvar instructions, preserve them inside all
    matrices, frames, and tensors, and thus ensure robustness for all
    kind of programs.
---
 .../controlprogram/caching/CacheableData.java      |   9 ++
 .../instructions/cp/VariableCPInstruction.java     |  16 +--
 .../sysds/test/functions/io/RenameIssueTest.java   | 134 +++++++++++++++++++++
 src/test/scripts/functions/io/Rename.dml           |  31 +++++
 4 files changed, 182 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index dc8ca3aec6..eba22e7f15 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -183,6 +183,7 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
        
        /** The name of HDFS file in which the data is backed up. */
        protected String _hdfsFileName = null; // file name and path
+       protected boolean _isPRead = false; //persistent read, must not be 
deleted
        
        /** 
         * Flag that indicates whether or not hdfs file exists.It is used 
@@ -285,6 +286,14 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
                return _hdfsFileName;
        }
        
+       public boolean isPersistentRead() {
+               return _isPRead;
+       }
+       
+       public void setPersistentRead(boolean pread) {
+               _isPRead = pread;
+       }
+       
        public long getUniqueID() {
                return _uniqueID;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 6ed292bbb0..c1397381ba 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -706,7 +706,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        case MATRIX: {
                                String fname = createUniqueFilename();
                                MatrixObject obj = new 
MatrixObject(getInput1().getValueType(), fname);
-                               setCacheableDataFields(obj);
+                               setCacheableDataFields(obj, 
getInput1().getName());
                                obj.setUpdateType(_updateType);
                                obj.setMarkForLinCache(true);
                                ec.setVariable(getInput1().getName(), obj);
@@ -717,14 +717,14 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        case TENSOR: {
                                String fname = createUniqueFilename();
                                TensorObject obj = new 
TensorObject(getInput1().getValueType(), fname);
-                               setCacheableDataFields(obj);
+                               setCacheableDataFields(obj, 
getInput1().getName());
                                ec.setVariable(getInput1().getName(), obj);
                                break;
                        }
                        case FRAME: {
                                String fname = createUniqueFilename();
                                FrameObject fobj = new FrameObject(fname);
-                               setCacheableDataFields(fobj);
+                               setCacheableDataFields(fobj, 
getInput1().getName());
                                if( _schema != null )
                                        fobj.setSchema(_schema); //after 
metadata
                                ec.setVariable(getInput1().getName(), fobj);
@@ -757,13 +757,14 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                return fname;
        }
 
-       private void setCacheableDataFields(CacheableData<?> obj){
+       private void setCacheableDataFields(CacheableData<?> obj, String 
varname){
                //clone metadata because it is updated on copy-on-write, 
otherwise there
                //is potential for hidden side effects between variables.
                obj.setMetaData((MetaData)metadata.clone());
                obj.enableCleanup(!getInput1().getName()
                        .startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
                obj.setFileFormatProperties(_formatProperties);
+               
obj.setPersistentRead(varname.startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
        }
 
        /**
@@ -960,7 +961,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
 
        /**
         * Handler for CastAsFrameVariable instruction
-   *
+        *
         * @param ec execution context
         */
        private void processCastAsFrameVariableInstruction(ExecutionContext ec){
@@ -1018,6 +1019,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
         * @param ec execution context
         */
        private void processCopyInstruction(ExecutionContext ec) {
+               
                // get source variable
                Data dd = ec.getVariable(getInput1().getName());
 
@@ -1142,9 +1144,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        try {
                                FileFormat fmt = 
((MetaDataFormat)mo.getMetaData()).getFileFormat();
                                DataCharacteristics dc = 
(mo.getMetaData()).getDataCharacteristics();
-                               if( fmt == FileFormat.CSV
-                                       && 
!getInput1().getName().startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX) )
-                               {
+                               if( fmt == FileFormat.CSV && 
!mo.isPersistentRead() ) {
                                        WriterTextCSV writer = new 
WriterTextCSV((FileFormatPropertiesCSV)fprop);
                                        writer.addHeaderToCSV(mo.getFileName(), 
fname, dc.getRows(), dc.getCols());
                                }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java 
b/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
new file mode 100644
index 0000000000..e7e975f304
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RenameIssueTest extends AutomatedTestBase {
+
+       protected static final Log LOG = 
LogFactory.getLog(RenameIssueTest.class.getName());
+
+       private final static String TEST_NAME1 = "Rename";
+       private final static String TEST_DIR = "functions/io/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
RenameIssueTest.class.getSimpleName() + "/";
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"L","R1"}) );
+       }
+
+       @Test
+       public void testCSVSinglenode() {
+               runRameTest(FileFormat.CSV, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testCSVHybrid() {
+               runRameTest(FileFormat.CSV, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testCSVSpark() {
+               runRameTest(FileFormat.CSV, ExecMode.SPARK);
+       }
+       
+       @Test
+       public void testTextSinglenode() {
+               runRameTest(FileFormat.TEXT, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testTextHybrid() {
+               runRameTest(FileFormat.TEXT, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testTextSpark() {
+               runRameTest(FileFormat.TEXT, ExecMode.SPARK);
+       }
+       
+       @Test
+       public void testBinarySinglenode() {
+               runRameTest(FileFormat.BINARY, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testBinaryHybrid() {
+               runRameTest(FileFormat.BINARY, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testBinarySpark() {
+               runRameTest(FileFormat.BINARY, ExecMode.SPARK);
+       }
+       
+       private void runRameTest(FileFormat fmt, ExecMode mode)
+       {
+               ExecMode modeOld = setExecMode(mode);
+               
+               try {
+                       TestConfiguration config = 
getTestConfiguration(TEST_NAME1);
+                       loadTestConfiguration(config);
+                       
+                       MatrixBlock a = 
DataConverter.convertToMatrixBlock(getRandomMatrix(7, 7, -1, 1, 0.5, -1));
+                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(fmt);
+                       writer.writeMatrixToHDFS(a, input("A"), 
+                               (long)a.getNumRows(), (long)a.getNumColumns(), 
(int)a.getNonZeros(), 1000);
+                       HDFSTool.writeMetaDataFile(input("A")+".mtd", 
ValueType.FP64,
+                               new MatrixCharacteristics(7,7,1000), fmt);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[]{"-explain", 
+                               "-args", input("A"), 
fmt.toString().toLowerCase(), output("B")};
+                       runTest(true, false, null, -1);
+                       
+                       //check file existence (no rename to output)
+                       Assert.assertTrue(new File(input("A")).exists());
+                       Assert.assertTrue(new File(output("B")).exists());
+               } 
+               catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+               finally {
+                       resetExecMode(modeOld);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/io/Rename.dml 
b/src/test/scripts/functions/io/Rename.dml
new file mode 100644
index 0000000000..14b114bba4
--- /dev/null
+++ b/src/test/scripts/functions/io/Rename.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X1 = read($1);
+
+Xa = X1;
+for(i in 1:2) {
+  write(Xa, $3, format=$2);
+  while(FALSE){} #write first
+  Xa = rbind(Xa, X1);
+  print("Creating and writing replicated dataset ["+i+"]");
+}
+

Reply via email to