Repository: incubator-systemml Updated Branches: refs/heads/master 8a05574c8 -> 821c5f50d
[SYSTEMML-827] Fix csv frame readers (handling of quoted/escape tokens) This patch introduces a safe IOUtilFunction.splitCSV which correctly handles quotes according to RFC4180. It is implemented from scratch to avoid additional dependencies and to keep this functionality as simple as possible. Performance-wise the splitCSV is even slightly faster than the default String.split - for 10M three column strings (10 repetitions): (1) String.split: 1455s vs (2) splitCSV: 1224s. Incl fixes for (1) computeCSVSize in FrameReaderTextCSV (num rows), and (2) CSVToBinaryBlockFunction in FrameRDDConverterUtils (trim). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/821c5f50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/821c5f50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/821c5f50 Branch: refs/heads/master Commit: 821c5f50d9f2b07d195d7a36e8a669ed96d61390 Parents: 8a05574 Author: Matthias Boehm <[email protected]> Authored: Fri Jul 22 00:55:39 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jul 22 00:55:39 2016 -0700 ---------------------------------------------------------------------- .../spark/utils/FrameRDDConverterUtils.java | 12 +- .../sysml/runtime/io/FrameReaderTextCSV.java | 4 +- .../sysml/runtime/io/IOUtilFunctions.java | 101 ++++++++++++++- .../sysml/runtime/matrix/data/FrameBlock.java | 4 +- .../functions/jmlc/FrameReadMetaTest.java | 4 +- .../TransformCSVFrameEncodeDecodeTest.java | 124 +++++++++++++++++++ .../transform/TransformCSVFrameEncodeDecode.dml | 32 +++++ .../transform/input/csv_mix/quotes1.csv | 30 +++++ .../functions/transform/ZPackageSuite.java | 1 + 9 files changed, 301 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index c53dd34..052d3f3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -91,7 +91,7 @@ public class FrameRDDConverterUtils || tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX); tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr; long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0); - long clen = tmpStr.split(delim).length; + long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length; mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); } @@ -425,18 +425,18 @@ public class FrameRDDConverterUtils while( arg0.hasNext() ) { Tuple2<Text,Long> tmp = arg0.next(); - String row = tmp._1().toString(); + String row = tmp._1().toString().trim(); long rowix = tmp._2(); if(_hasHeader && rowix == 0) { //Skip header - _colnames = Arrays.asList(row.split(_delim)); + _colnames = Arrays.asList(IOUtilFunctions.splitCSV(row, _delim)); continue; } if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) { - _mvMeta = Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1)); + _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); continue; } else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) { - _ndMeta = Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1)); + _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); continue; } @@ -451,7 +451,7 @@ public class FrameRDDConverterUtils } //process row data - String[] parts = IOUtilFunctions.split(row, _delim); + String[] parts = IOUtilFunctions.splitCSV(row, _delim); boolean emptyFound = false; mb[0].appendRow(parts); iRowsInBlock++; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index bf3d79f..b608991 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -171,7 +171,7 @@ public class FrameReaderTextCSV extends FrameReader { String cellStr = value.toString().trim(); emptyValuesFound = false; col = 0; - String[] parts = IOUtilFunctions.split(cellStr, delim); + String[] parts = IOUtilFunctions.splitCSV(cellStr, delim); //parse frame meta data (missing values / num distinct) if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) || parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) { @@ -232,7 +232,7 @@ public class FrameReaderTextCSV extends FrameReader int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim()); //compute number of rows - int nrow = -1; + int nrow = 0; for( int i=0; i<splits.length; i++ ) { RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 275cfe4..49e8098 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -47,6 +48,8 @@ public class IOUtilFunctions { private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName()); + private static final char CSV_QUOTE_CHAR = '"'; + /** * * @param io @@ -138,6 +141,102 @@ public class IOUtilFunctions } /** + * Splits a string by a specified delimiter into all tokens, including empty + * while respecting the rules for quotes and escapes defined in RFC4180. + * + * NOTE: use StringEscapeUtils.unescapeCsv(tmp) if needed afterwards. + * + * @param str + * @param delim + * @return + */ + public static String[] splitCSV(String str, String delim) + { + // check for empty input + if( str == null || str.isEmpty() ) + return new String[]{""}; + + // scan string and create individual tokens + ArrayList<String> tokens = new ArrayList<String>(); + int from = 0, to = 0; + int len = str.length(); + while( from < len ) { // for all tokens + if( str.charAt(from) == CSV_QUOTE_CHAR ) { + to = str.indexOf(CSV_QUOTE_CHAR, from+1); + // handle escaped inner quotes, e.g. "aa""a" + while( to+1 < len && str.charAt(to+1)==CSV_QUOTE_CHAR ) + to = str.indexOf(CSV_QUOTE_CHAR, to+2); // to + "" + to += 1; // last " + } + else if(str.regionMatches(from, delim, 0, delim.length())) { + to = from; // empty string + } + else { // default: unquoted non-empty + to = str.indexOf(delim, from+1); + } + + // slice out token and advance position + to = (to >= 0) ? to : len; + tokens.add(str.substring(from, to)); + from = to + delim.length(); + } + + // handle empty string at end + if( from == len ) + tokens.add(""); + + // return tokens + return tokens.toArray(new String[0]); + } + + /** + * Counts the number of tokens defined by the given delimiter, respecting + * the rules for quotes and escapes defined in RFC4180. + * + * @param str + * @param delim + * @return + */ + public static int countTokensCSV(String str, String delim) + { + // check for empty input + if( str == null || str.isEmpty() ) + return 1; + + // scan string and compute num tokens + int numTokens = 0; + int from = 0, to = 0; + int len = str.length(); + while( from < len ) { // for all tokens + if( str.charAt(from) == CSV_QUOTE_CHAR ) { + to = str.indexOf(CSV_QUOTE_CHAR, from+1); + // handle escaped inner quotes, e.g. "aa""a" + while( to+1 < len && str.charAt(to+1)==CSV_QUOTE_CHAR ) + to = str.indexOf(CSV_QUOTE_CHAR, to+2); // to + "" + to += 1; // last " + } + else if(str.regionMatches(from, delim, 0, delim.length())) { + to = from; // empty string + } + else { // default: unquoted non-empty + to = str.indexOf(delim, from+1); + } + + //increase counter and advance position + to = (to >= 0) ? to : len; + from = to + delim.length(); + numTokens++; + } + + // handle empty string at end + if( from == len ) + numTokens++; + + // return number of tokens + return numTokens; + } + + /** * * @param input * @return @@ -218,7 +317,7 @@ public class IOUtilFunctions if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) reader.next(key, value); if( !row.isEmpty() ) - ncol = StringUtils.countMatches(row, delim) + 1; + ncol = IOUtilFunctions.countTokensCSV(row, delim); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index a4f826c..78fa165 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -40,6 +40,7 @@ import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; @@ -871,7 +872,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable for( int i=0; i<getNumRows(); i++ ) { Object val = ldata.get(i); if( val != null ) { - String[] tmp = val.toString().split(Lop.DATATYPE_PREFIX); + String[] tmp = IOUtilFunctions.splitCSV( + val.toString(), Lop.DATATYPE_PREFIX); map.put(tmp[0], Long.parseLong(tmp[1])); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java index b0eb3ce..5311101 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java @@ -33,6 +33,7 @@ import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; @@ -188,7 +189,8 @@ public class FrameReadMetaTest extends AutomatedTestBase if( collist.contains(j+1) && tmp[j] != null ) { if( ret[j] == null ) ret[j] = new HashMap<String,Long>(); - String[] parts = tmp[j].toString().split(Lop.DATATYPE_PREFIX); + String[] parts = IOUtilFunctions.splitCSV( + tmp[j].toString(), Lop.DATATYPE_PREFIX); ret[j].put(parts[0], Long.parseLong(parts[1])); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformCSVFrameEncodeDecodeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformCSVFrameEncodeDecodeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformCSVFrameEncodeDecodeTest.java new file mode 100644 index 0000000..9ee3d5f --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformCSVFrameEncodeDecodeTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.transform; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.io.FrameReader; +import org.apache.sysml.runtime.io.FrameReaderFactory; +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; +import org.apache.sysml.utils.Statistics; + +public class TransformCSVFrameEncodeDecodeTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "TransformCSVFrameEncodeDecode"; + private final static String TEST_DIR = "functions/transform/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TransformCSVFrameEncodeDecodeTest.class.getSimpleName() + "/"; + + //dataset and transform tasks without missing values + private final static String DATASET = "csv_mix/quotes1.csv"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); + } + + @Test + public void testHomesRecodeIDsSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv"); + } + + @Test + public void testHomesRecodeIDsSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv"); + } + + @Test + public void testHomesRecodeIDsHybridCSV() { + runTransformTest(RUNTIME_PLATFORM.HYBRID_SPARK, "csv"); + } + /** + * + * @param rt + * @param ofmt + * @param dataset + */ + private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt ) + { + //set runtime platform + RUNTIME_PLATFORM rtold = rtplatform; + boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; + rtplatform = rt; + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + if( !ofmt.equals("csv") ) + throw new RuntimeException("Unsupported test output format"); + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-explain","-args", + HOME + "input/" + DATASET, output("R") }; + + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; + runTest(true, false, null, -1); + + //read input/output and compare + FrameReader reader1 = FrameReaderFactory.createFrameReader(InputInfo.CSVInputInfo, + new CSVFileFormatProperties(false, ",", false)); + FrameBlock fb1 = reader1.readFrameFromHDFS(HOME + "input/" + DATASET, -1L, -1L); + FrameReader reader2 = FrameReaderFactory.createFrameReader(InputInfo.CSVInputInfo); + FrameBlock fb2 = reader2.readFrameFromHDFS(output("R"), -1L, -1L); + String[][] R1 = DataConverter.convertToStringFrame(fb1); + String[][] R2 = DataConverter.convertToStringFrame(fb2); + TestUtils.compareFrames(R1, R2, R1.length, R1[0].length); + + if( rt == RUNTIME_PLATFORM.HYBRID_SPARK ) { + Assert.assertEquals("Wrong number of executed Spark instructions: " + + Statistics.getNoOfExecutedSPInst(), new Long(2), new Long(Statistics.getNoOfExecutedSPInst())); + } + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + rtplatform = rtold; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/test/scripts/functions/transform/TransformCSVFrameEncodeDecode.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/TransformCSVFrameEncodeDecode.dml b/src/test/scripts/functions/transform/TransformCSVFrameEncodeDecode.dml new file mode 100644 index 0000000..50ec83c --- /dev/null +++ b/src/test/scripts/functions/transform/TransformCSVFrameEncodeDecode.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +F1 = read($1, data_type="frame", format="csv"); +jspec = "{\"ids\": true, \"recode\": [1,2,3]}"; + +[X, M] = transformencode(target=F1, spec=jspec); + +if(1==1){} + +F2 = transformdecode(target=X, spec=jspec, meta=M); + +write(F2, $2, format="csv"); + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/test/scripts/functions/transform/input/csv_mix/quotes1.csv ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/csv_mix/quotes1.csv b/src/test/scripts/functions/transform/input/csv_mix/quotes1.csv new file mode 100644 index 0000000..33936c7 --- /dev/null +++ b/src/test/scripts/functions/transform/input/csv_mix/quotes1.csv @@ -0,0 +1,30 @@ +aaa,bbb,ccc +aaa,bbb,ccc +zzz,yyy,xxx +zzz,yyy,xxx +"aaa","bbb","ccc" +"aaa","bbb","ccc" +"zzz","yyy","xxx" +"zzz","yyy","xxx" +"""zzz","y""yy","xx""x" +"""zzz","y""yy","xx""x" +""",zzz","y"",""yy","xx""x" +""",zzz","y"",""yy","xx""x" +"aaa",""",""yy","xx""x" +"aaa",""",""yy","xx""x" +"aaa","b""""""bb""""b","ccc" +"aaa","b""""""bb""""b","ccc" +"a,a,a","b,,b,,b","c,cc" +"a,a,a","b,,b,,b","c,cc" +"a,aa",bbb,"cc,c" +"a,aa",bbb,"cc,c" +aaa,bbb,ccc +aaa,bbb,ccc +"a'aa",b'bb,c'cc +"a'aa",b'bb,c'cc +"a\\naa","b\n\nbb","ccc" +"a\\naa","b\n\nbb","ccc" +"a\t\r\naa","b\tb,b","c\tc,c" +"a\t\r\naa","b\tb,b","c\tc,c" +aaa,""",,,b,,","c,c,c" +aaa,""",,,b,,","c,c,c" http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/821c5f50/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java index bdcc36b..10a2518 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java @@ -29,6 +29,7 @@ import org.junit.runners.Suite; RunTest.class, ScalingTest.class, TransformAndApplyTest.class, + TransformCSVFrameEncodeDecodeTest.class, TransformEncodeDecodeTest.class, TransformFrameApplyTest.class, TransformFrameEncodeApplyTest.class,
