This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 6e85843 [SYSTEMDS-2868] Fix spark transformencode (tokens with spaces)
6e85843 is described below
commit 6e858434fdd6715925999e41c02a248fe988387d
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Feb 20 22:36:26 2021 +0100
[SYSTEMDS-2868] Fix spark transformencode (tokens with spaces)
The spark transformencode reuses the textcell framereader to read the
meta data in consistent form after the parallel pre-pass for determining
recode maps, bin boundaries, mv values for all columns. The textcell
readers however are derived from matrix market which does not support
quoting. Therefore, the spark transformencode fails on tokens that
contain spaces which are the delimiters for textcell.
We now sanitize the tokens before central consolidation, and
subsequently desanitize the tokens into their original representations.
The temporary replacement is a string that is very unlikely to appear in
practice.
To facilitate this fix, the patch also contains a general mapInplace
frame operation that accepts arbitrary string-string lambda functions to
process in-place on the existing frame.
---
...ltiReturnParameterizedBuiltinSPInstruction.java | 9 ++++---
.../sysds/runtime/io/FrameReaderTextCell.java | 5 ++--
.../sysds/runtime/matrix/data/FrameBlock.java | 10 ++++++++
.../apache/sysds/runtime/transform/TfUtils.java | 29 +++++++++++++---------
.../transform/TransformFrameEncodeDecodeTest.java | 10 ++++----
.../functions/transform/input/homes3/homes.csv | 6 ++---
6 files changed, 44 insertions(+), 25 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index bb290fc..e797eee 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -50,6 +50,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.transform.TfUtils;
import org.apache.sysds.runtime.transform.encode.Encoder;
import org.apache.sysds.runtime.transform.encode.EncoderBin;
import org.apache.sysds.runtime.transform.encode.EncoderComposite;
@@ -138,6 +139,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
FrameBlock meta =
reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(),
fo.getNumColumns());
meta.recomputeColumnCardinality(); //recompute num
distinct items per column
meta.setColumnNames((colnames!=null)?colnames:meta.getColumnNames());
+ meta.mapInplace(v -> TfUtils.desanitizeSpaces(v));
//due to format TEXT
//step 2: transform apply (similar to spark
transformapply)
//compute omit offset map for block shifts
@@ -326,8 +328,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
//handle recode maps
if( _encoder.isEncoder(colID, EncoderRecode.class) ) {
while( iter.hasNext() ) {
+ String token =
TfUtils.sanitizeSpaces(iter.next().toString());
sb.append(rowID).append('
').append(scolID).append(' ');
-
sb.append(EncoderRecode.constructRecodeMapEntry(iter.next().toString(), rowID));
+
sb.append(EncoderRecode.constructRecodeMapEntry(token, rowID));
ret.add(sb.toString());
sb.setLength(0);
rowID++;
@@ -440,7 +443,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
mode = e.getKey();
max = e.getValue();
}
- ret.add("-2 " + colix + " " + mode);
+ ret.add("-2 " + colix + " " +
TfUtils.sanitizeSpaces(mode));
}
//compute global mean of categorical feature
else if( _encoder.getMethod(colix) ==
MVMethod.GLOBAL_MEAN ) {
@@ -458,7 +461,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
//pass-through constant label
else if( _encoder.getMethod(colix) == MVMethod.CONSTANT
) {
if( iter.hasNext() )
- ret.add("-2 " + colix + " " +
iter.next().getMvValue());
+ ret.add("-2 " + colix + " " +
TfUtils.sanitizeSpaces(iter.next().getMvValue()));
}
return ret.iterator();
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
index 8db958d..bd65220 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
@@ -38,6 +38,7 @@ import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.transform.TfUtils;
import org.apache.sysds.runtime.util.FastStringTokenizer;
import org.apache.sysds.runtime.util.UtilFunctions;
@@ -125,7 +126,7 @@ public class FrameReaderTextCell extends FrameReader
row = st.nextInt()-1;
col = st.nextInt()-1;
if( row == -3 )
-
dest.getColumnMetadata(col).setMvValue(st.nextToken());
+
dest.getColumnMetadata(col).setMvValue(TfUtils.desanitizeSpaces(st.nextToken()));
else if( row == -2 )
dest.getColumnMetadata(col).setNumDistinct(st.nextLong());
else
@@ -172,7 +173,7 @@ public class FrameReaderTextCell extends FrameReader
row = st.nextInt()-1;
col = st.nextInt()-1;
if( row == -3 )
-
dest.getColumnMetadata(col).setMvValue(st.nextToken());
+
dest.getColumnMetadata(col).setMvValue(TfUtils.desanitizeSpaces(st.nextToken()));
else if (row == -2)
dest.getColumnMetadata(col).setNumDistinct(st.nextLong());
else
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index d82d40b..cf17980 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -2107,6 +2108,15 @@ public class FrameBlock implements CacheBlock,
Externalizable {
mergedFrame.appendRow(rowTemp1);
return mergedFrame;
}
+
+ public void mapInplace(Function<String, String> fun) {
+ for(int j=0; j<getNumColumns(); j++)
+ for(int i=0; i<getNumRows(); i++) {
+ Object tmp = get(i, j);
+ set(i, j, (tmp == null) ? tmp :
+
UtilFunctions.objectToObject(_schema[j], fun.apply(tmp.toString())));
+ }
+ }
public FrameBlock map (String lambdaExpr){
if(!lambdaExpr.contains("->")) {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
b/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
index 635d730..d895fd1 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
@@ -20,7 +20,6 @@
package org.apache.sysds.runtime.transform;
import java.io.Serializable;
-import java.util.regex.Pattern;
import org.apache.sysds.lops.Lop;
@@ -72,19 +71,12 @@ public class TfUtils implements Serializable
public static final String JSON_CONSTS = "constants";
public static final String JSON_NBINS = "numbins";
- private String _headerLine = null;
- private boolean _hasHeader;
- private Pattern _delim = null;
- private String _delimString = null;
+ public static final String EXT_SPACE = Lop.DATATYPE_PREFIX
+ + Lop.INSTRUCTION_DELIMITOR +Lop.DATATYPE_PREFIX;
+
private String[] _NAstrings = null;
- private int _numInputCols = -1;
- public String getHeader() { return _headerLine; }
- public boolean hasHeader() { return _hasHeader; }
- public String getDelimString() { return _delimString; }
- public Pattern getDelim() { return _delim; }
- public String[] getNAStrings() { return _NAstrings; }
- public long getNumCols() { return _numInputCols; }
+ public String[] getNAStrings() { return _NAstrings; }
/**
* Function that checks if the given string is one of NA strings.
@@ -103,4 +95,17 @@ public class TfUtils implements Serializable
}
return false;
}
+
+ public static String sanitizeSpaces(String token) {
+ //due to the use of textcell (derived from matrix market), we
cannot
+ //simply quote the token as its parsed without support for
quoting.
+ //Hence, we replace with a very unlikely character sequence
+ return token != null && token.contains(" ") ?
+ token.replaceAll(" ", EXT_SPACE): token;
+ }
+
+ public static String desanitizeSpaces(String token) {
+ return token != null && token.contains(EXT_SPACE) ?
+ token.replaceAll(EXT_SPACE, " ") : token;
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
index a6f98f9..6f38ad9 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
@@ -41,11 +41,11 @@ public class TransformFrameEncodeDecodeTest extends
AutomatedTestBase
private final static String TEST_CLASS_DIR = TEST_DIR +
TransformFrameEncodeDecodeTest.class.getSimpleName() + "/";
//dataset and transform tasks without missing values
- private final static String DATASET1 = "homes3/homes.csv";
- private final static String SPEC1 =
"homes3/homes.tfspec_recode.json";
- private final static String SPEC1b =
"homes3/homes.tfspec_recode2.json";
- private final static String SPEC2 =
"homes3/homes.tfspec_dummy.json";
- private final static String SPEC2b =
"homes3/homes.tfspec_dummy2.json";
+ private final static String DATASET1 = "homes3/homes.csv";
+ private final static String SPEC1 =
"homes3/homes.tfspec_recode.json";
+ private final static String SPEC1b =
"homes3/homes.tfspec_recode2.json";
+ private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
+ private final static String SPEC2b =
"homes3/homes.tfspec_dummy2.json";
public enum TransformType {
RECODE,
diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv
b/src/test/scripts/functions/transform/input/homes3/homes.csv
index aeed601..880f76c 100644
--- a/src/test/scripts/functions/transform/input/homes3/homes.csv
+++ b/src/test/scripts/functions/transform/input/homes3/homes.csv
@@ -68,13 +68,13 @@
zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
95141,east,3903,1,2.5,2,FALSE,976,981
91312,south,1076,2,2.5,1,FALSE,597,600
96334,west,1719,1,1.5,3,FALSE,738,742
-94555,north,1439,4,1.5,1,FALSE,589,592
+94555,north east,1439,4,1.5,1,FALSE,589,592
91312,east,1961,2,3,1,TRUE,775,778
94555,north,2471,1,1.5,1,TRUE,753,756
91312,west,3930,4,2.5,2,FALSE,1004,1009
95141,south,2833,1,1,1,FALSE,718,721
96334,south,2580,4,1,2,TRUE,816,820
-94555,south,2169,3,2.5,3,TRUE,904,908
+94555,south east ,2169,3,2.5,3,TRUE,904,908
95141,east,3329,4,3,3,TRUE,1064,1069
96334,south,3392,4,2,3,TRUE,1026,1031
96334,east,3688,6,2.5,3,FALSE,1032,1037
@@ -85,7 +85,7 @@
zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
96334,east,1732,3,2,1,TRUE,700,703
96334,south,2188,4,2,1,TRUE,767,771
96334,south,3750,6,2,2,FALSE,963,967
-98755,north,2331,1,1.5,1,TRUE,740,743
+98755,north east ,2331,1,1.5,1,TRUE,740,743
94555,north,1512,4,3,3,TRUE,854,858
98755,north,3352,3,3,3,FALSE,1014,1018
94555,south,3426,3,2.5,2,FALSE,937,941