Repository: flink
Updated Branches:
  refs/heads/master 06b2acf79 -> 0548a93df


http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 5ccabf5..56b1dd0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -44,15 +44,15 @@ public class VarLengthStringParserTest {
                StringValue s = new StringValue();
                
                int startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 9);
                assertTrue(s.getValue().equals("abcdefgh"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 11);
                assertTrue(s.getValue().equals("i"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 18);
                assertTrue(s.getValue().equals("jklmno"));
                
@@ -60,18 +60,18 @@ public class VarLengthStringParserTest {
                // check single field not terminated
                recBytes = "abcde".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 5);
                assertTrue(s.getValue().equals("abcde"));
                
                // check last field not terminated
                recBytes = "abcde|fg".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 6);
                assertTrue(s.getValue().equals("abcde"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 8);
                assertTrue(s.getValue().equals("fg"));
        }
@@ -84,15 +84,15 @@ public class VarLengthStringParserTest {
                StringValue s = new StringValue();
                
                int startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 11);
                assertTrue(s.getValue().equals("abcdefgh"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 15);
                assertTrue(s.getValue().equals("i"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 24);
                assertTrue(s.getValue().equals("jklmno"));
                
@@ -100,40 +100,40 @@ public class VarLengthStringParserTest {
                // check single field not terminated
                recBytes = "\"abcde\"".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 7);
                assertTrue(s.getValue().equals("abcde"));
                
                // check last field not terminated
                recBytes = "\"abcde\"|\"fg\"".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 8);
                assertTrue(s.getValue().equals("abcde"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 12);
                assertTrue(s.getValue().equals("fg"));
                
                // check delimiter in quotes 
                recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 11);
                assertTrue(s.getValue().equals("abcde|fg"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 26);
                assertTrue(s.getValue().equals("hij|kl|mn|op"));
                
                // check delimiter in quotes last field not terminated
                recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 11);
                assertTrue(s.getValue().equals("abcde|fg"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 25);
                assertTrue(s.getValue().equals("hij|kl|mn|op"));
        }
@@ -146,15 +146,15 @@ public class VarLengthStringParserTest {
                StringValue s = new StringValue();
                
                int startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 13);
                assertTrue(s.getValue().equals("abcdefgh"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 25);
                assertTrue(s.getValue().equals("i"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 39);
                assertTrue(s.getValue().equals("jklmno"));
                
@@ -163,29 +163,29 @@ public class VarLengthStringParserTest {
                s = new StringValue();
                
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 13);
                assertTrue(s.getValue().equals("abcdefgh"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 25);
                assertTrue(s.getValue().equals("i"));
                
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 38);
                assertTrue(s.getValue().equals("jklmno"));
                
                // check single field not terminated
                recBytes = "  \t\"abcde\"\t\t  \t ".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 16);
                assertTrue(s.getValue().equals("abcde"));
                
                // check single field terminated
                recBytes = "  \t\"abcde\"\t\t  \t |".getBytes();
                startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos == 17);
                assertTrue(s.getValue().equals("abcde"));
        }
@@ -198,7 +198,7 @@ public class VarLengthStringParserTest {
                StringValue s = new StringValue();
                
                int startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, '|', s);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos < 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
index 5f49bd3..73733fd 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -313,7 +313,7 @@ public class KMeans {
                if(fileOutput) {
                        // read points from CSV file
                        return env.readCsvFile(pointsPath)
-                                               .fieldDelimiter(' ')
+                                               .fieldDelimiter(" ")
                                                .includeFields(true, true)
                                                .types(Double.class, 
Double.class)
                                                .map(new TuplePointConverter());
@@ -325,7 +325,7 @@ public class KMeans {
        private static DataSet<Centroid> 
getCentroidDataSet(ExecutionEnvironment env) {
                if(fileOutput) {
                        return env.readCsvFile(centersPath)
-                                               .fieldDelimiter(' ')
+                                               .fieldDelimiter(" ")
                                                .includeFields(true, true, true)
                                                .types(Integer.class, 
Double.class, Double.class)
                                                .map(new 
TupleCentroidConverter());

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index 1519a35..cb99e23 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -234,7 +234,7 @@ public class ConnectedComponents implements 
ProgramDescription {
        private static DataSet<Tuple2<Long, Long>> 
getEdgeDataSet(ExecutionEnvironment env) {
                
                if(fileOutput) {
-                       return env.readCsvFile(edgesPath).fieldDelimiter(' 
').types(Long.class, Long.class); 
+                       return env.readCsvFile(edgesPath).fieldDelimiter(" 
").types(Long.class, Long.class);
                } else {
                        return 
ConnectedComponentsData.getDefaultEdgeDataSet(env);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
index 984b5a5..2b65ad6 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -218,7 +218,7 @@ public class EnumTrianglesBasic {
        private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
                if(fileOutput) {
                        return env.readCsvFile(edgePath)
-                                               .fieldDelimiter(' ')
+                                               .fieldDelimiter(" ")
                                                .includeFields(true, true)
                                                .types(Integer.class, 
Integer.class)
                                                .map(new TupleEdgeConverter());

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
index afbca1e..3f05a2b 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -342,7 +342,7 @@ public class EnumTrianglesOpt {
        private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
                if(fileOutput) {
                        return env.readCsvFile(edgePath)
-                                               .fieldDelimiter(' ')
+                                               .fieldDelimiter(" ")
                                                .includeFields(true, true)
                                                .types(Integer.class, 
Integer.class)
                                                .map(new TupleEdgeConverter());

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
index 83cecc2..4e4d9ad 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -263,7 +263,7 @@ public class PageRankBasic {
                if(fileOutput) {
                        return env
                                                .readCsvFile(pagesInputPath)
-                                                       .fieldDelimiter(' ')
+                                                       .fieldDelimiter(" ")
                                                        .lineDelimiter("\n")
                                                        .types(Long.class)
                                                .map(new 
MapFunction<Tuple1<Long>, Long>() {
@@ -278,7 +278,7 @@ public class PageRankBasic {
        private static DataSet<Tuple2<Long, Long>> 
getLinksDataSet(ExecutionEnvironment env) {
                if(fileOutput) {
                        return env.readCsvFile(linksInputPath)
-                                               .fieldDelimiter(' ')
+                                               .fieldDelimiter(" ")
                                                .lineDelimiter("\n")
                                                .types(Long.class, Long.class);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index f532a95..abe96c8 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -147,7 +147,7 @@ public class TransitiveClosureNaive implements 
ProgramDescription {
        private static DataSet<Tuple2<Long, Long>> 
getEdgeDataSet(ExecutionEnvironment env) {
 
                if(fileOutput) {
-                       return env.readCsvFile(edgesPath).fieldDelimiter(' 
').types(Long.class, Long.class);
+                       return env.readCsvFile(edgesPath).fieldDelimiter(" 
").types(Long.class, Long.class);
                } else {
                        return 
ConnectedComponentsData.getDefaultEdgeDataSet(env);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 1ee2b80..9694647 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -297,7 +297,7 @@ public class LinearRegression {
                if(fileOutput) {
                        // read data from CSV file
                        return env.readCsvFile(dataPath)
-                                       .fieldDelimiter(' ')
+                                       .fieldDelimiter(" ")
                                        .includeFields(true, true)
                                        .types(Double.class, Double.class)
                                        .map(new TupleDataConverter());

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 4192910..09b88fa 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -125,7 +125,7 @@ public class EmptyFieldsCountAccumulator {
                } else {
                        source = env
                                        .readCsvFile(filePath)
-                                       .fieldDelimiter(';')
+                                       .fieldDelimiter(";")
                                        .types(String.class, String.class, 
String.class);
 
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index 436a3d4..3dc0472 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -205,28 +205,28 @@ public class TPCHQuery10 {
        
        private static DataSet<Tuple5<Integer, String, String, Integer, 
Double>> getCustomerDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(customerPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("11110100")
                                        .types(Integer.class, String.class, 
String.class, Integer.class, Double.class);
        }
        
        private static DataSet<Tuple3<Integer, Integer, String>> 
getOrdersDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(ordersPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("110010000")
                                        .types(Integer.class, Integer.class, 
String.class);
        }
 
        private static DataSet<Tuple4<Integer, Double, Double, String>> 
getLineitemDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(lineitemPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("1000011010000000")
                                        .types(Integer.class, Double.class, 
Double.class, String.class);
        }
        
        private static DataSet<Tuple2<Integer, String>> 
getNationsDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(nationPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("1100")
                                        .types(Integer.class, String.class);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index 46161b4..9a6e58c 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -250,21 +250,21 @@ public class TPCHQuery3 {
        
        private static DataSet<Lineitem> 
getLineitemDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(lineitemPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("1000011000100000")
                                        .tupleType(Lineitem.class);
        }
        
        private static DataSet<Customer> 
getCustomerDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(customerPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("10000010")
                                        .tupleType(Customer.class);
        }
        
        private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment 
env) {
                return env.readCsvFile(ordersPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("110010010")
                                        .tupleType(Order.class);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 22b00f6..44f134a 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -294,7 +294,7 @@ public class WebLogAnalysis {
                // Create DataSet for documents relation (URL, Doc-Text)
                if(fileOutput) {
                        return env.readCsvFile(documentsPath)
-                                               .fieldDelimiter('|')
+                                               .fieldDelimiter("|")
                                                .types(String.class, 
String.class);
                } else {
                        return WebLogData.getDocumentDataSet(env);
@@ -305,7 +305,7 @@ public class WebLogAnalysis {
                // Create DataSet for ranks relation (Rank, URL, 
Avg-Visit-Duration)
                if(fileOutput) {
                        return env.readCsvFile(ranksPath)
-                                               .fieldDelimiter('|')
+                                               .fieldDelimiter("|")
                                                .types(Integer.class, 
String.class, Integer.class);
                } else {
                        return WebLogData.getRankDataSet(env);
@@ -316,7 +316,7 @@ public class WebLogAnalysis {
                // Create DataSet for visits relation (URL, Date)
                if(fileOutput) {
                        return env.readCsvFile(visitsPath)
-                                               .fieldDelimiter('|')
+                                               .fieldDelimiter("|")
                                                .includeFields("011000000")
                                                .types(String.class, 
String.class);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 9e1ad7d..269ba7f 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -135,7 +135,7 @@ object KMeans {
     if (fileOutput) {
       env.readCsvFile[(Double, Double)](
         pointsPath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1))
         .map { x => new Point(x._1, x._2)}
     }
@@ -151,7 +151,7 @@ object KMeans {
     if (fileOutput) {
       env.readCsvFile[(Int, Double, Double)](
         centersPath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1, 2))
         .map { x => new Centroid(x._1, x._2, x._3)}
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index 5a37a48..dfebfe9 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -143,7 +143,7 @@ object ConnectedComponents {
     if (fileOutput) {
       env.readCsvFile[(Long, Long)](
         edgesPath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1))
         .map { x => (x._1, x._2)}
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index d4e8920..6ebacee 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -160,7 +160,7 @@ object EnumTrianglesBasic {
 
   private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
     if (fileOutput) {
-      env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = 
Array(0, 1))
+      env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = 
Array(0, 1))
     } else {
       val edges = EnumTrianglesData.EDGES.map {
         case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], 
v2.asInstanceOf[Int])

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index 274cf9a..bde1bf3 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -229,7 +229,7 @@ object EnumTrianglesOpt {
     if (fileOutput) {
       env.readCsvFile[Edge](
         edgePath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1))
     } else {
       val edges = EnumTrianglesData.EDGES.map {

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index 8f0ac0d..1d70ae9 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -179,7 +179,7 @@ object PageRankBasic {
 
   private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
     if (fileOutput) {
-      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = ' ', 
lineDelimiter = "\n")
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", 
lineDelimiter = "\n")
         .map(x => x._1)
     } else {
       env.generateSequence(1, 15)
@@ -188,7 +188,7 @@ object PageRankBasic {
 
   private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
     if (fileOutput) {
-      env.readCsvFile[Link](linksInputPath, fieldDelimiter = ' ',
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
         includedFields = Array(0, 1))
     } else {
       val edges = PageRankData.EDGES.map { case Array(v1, v2) => 
Link(v1.asInstanceOf[Long],

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 3bae2c0..b281b98 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -101,7 +101,7 @@ object TransitiveClosureNaive {
     if (fileOutput) {
       env.readCsvFile[(Long, Long)](
         edgesPath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1))
         .map { x => (x._1, x._2)}
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index 86526a6..ea96e7c 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -172,7 +172,7 @@ object LinearRegression {
     if (fileOutput) {
       env.readCsvFile[(Double, Double)](
         dataPath,
-        fieldDelimiter = ' ',
+        fieldDelimiter = " ",
         includedFields = Array(0, 1))
         .map { t => new Data(t._1, t._2) }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
index b39a2dd..1b1d5c5 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -155,14 +155,14 @@ object TPCHQuery10 {
                          DataSet[Tuple5[Int, String, String, Int, Double]] = {
     env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
         customerPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0,1,2,3,5) )
   }
   
   private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, 
Int, String]] = {
     env.readCsvFile[Tuple3[Int, Int, String]](
         ordersPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 1, 4) )
   }
   
@@ -170,14 +170,14 @@ object TPCHQuery10 {
                          DataSet[Tuple4[Int, Double, Double, String]] = {
     env.readCsvFile[Tuple4[Int, Double, Double, String]](
         lineitemPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 5, 6, 8) )
   }
 
   private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, 
String]] = {
     env.readCsvFile[Tuple2[Int, String]](
         nationPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 1) )
   }
   

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
index 6b8b0fd..0e0ecd3 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -151,21 +151,21 @@ object TPCHQuery3 {
   private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
     env.readCsvFile[Lineitem](
         lineitemPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 5, 6, 10) )
   }
 
   private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] 
= {
     env.readCsvFile[Customer](
         customerPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 6) )
   }
   
   private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
     env.readCsvFile[Order](
         ordersPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 1, 4, 7) )
   }
   

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 1ab5c01..38b6415 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -167,7 +167,7 @@ object WebLogAnalysis {
     if (fileOutput) {
       env.readCsvFile[(String, String)](
         documentsPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 1))
     }
     else {
@@ -182,7 +182,7 @@ object WebLogAnalysis {
     if (fileOutput) {
       env.readCsvFile[(Int, String, Int)](
         ranksPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(0, 1, 2))
     }
     else {
@@ -197,7 +197,7 @@ object WebLogAnalysis {
     if (fileOutput) {
       env.readCsvFile[(String, String)](
         visitsPath,
-        fieldDelimiter = '|',
+        fieldDelimiter = "|",
         includedFields = Array(1, 2))
     }
     else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index fe2ae14..7669c39 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -50,7 +50,7 @@ public class CsvInputFormat<OUT extends Tuple> extends 
GenericCsvInputFormat<OUT
        
        public static final String DEFAULT_LINE_DELIMITER = "\n";
        
-       public static final char DEFAULT_FIELD_DELIMITER = ',';
+       public static final String DEFAULT_FIELD_DELIMITER = ",";
 
 
        private transient Object[] parsedValues;
@@ -74,7 +74,7 @@ public class CsvInputFormat<OUT extends Tuple> extends 
GenericCsvInputFormat<OUT
                this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
        }       
        
-       public CsvInputFormat(Path filePath, String lineDelimiter, char 
fieldDelimiter, Class<?>... types) {
+       public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class<?>... types) {
                super(filePath);
 
                setDelimiter(lineDelimiter);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index f4c9e32..b31c9a4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -49,7 +49,7 @@ public class CsvReader {
        
        protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
        
-       protected char fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+       protected String fieldDelimiter = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER;
        
        protected String commentPrefix = null; //default: no comments
 
@@ -100,7 +100,20 @@ public class CsvReader {
         * @param delimiter The delimiter that separates the fields in one row.
         * @return The CSV reader instance itself, to allow for fluent function 
chaining.
         */
+       @Deprecated
        public CsvReader fieldDelimiter(char delimiter) {
+               this.fieldDelimiter = String.valueOf(delimiter);
+               return this;
+       }
+
+       /**
+        * Configures the delimiter that separates the fields within a row. The 
comma character
+        * ({@code ','}) is used by default.
+        *
+        * @param delimiter The delimiter that separates the fields in one row.
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public CsvReader fieldDelimiter(String delimiter) {
                this.fieldDelimiter = delimiter;
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index e2f9f35..6a22767 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -73,7 +73,7 @@ public class PrimitiveInputFormat<OT> extends 
DelimitedInputFormat<OT> {
                }
 
                // Null character as delimiter is used because there's only 1 
field to be parsed
-               if (parser.parseField(bytes, offset, numBytes + offset, '\0', 
reuse) >= 0) {
+               if (parser.parseField(bytes, offset, numBytes + offset, new 
byte[]{'\0'}, reuse) >= 0) {
                        return parser.getLastResult();
                } else {
                        String s = new String(bytes, offset, numBytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
index 3e06fb5..5f83a19 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
@@ -142,12 +142,7 @@ public class CsvInputFormat extends 
GenericCsvInputFormat<Record> {
                
                final String fieldDelimStr = 
config.getString(FIELD_DELIMITER_PARAMETER, null);
                if (fieldDelimStr != null) {
-                       if (fieldDelimStr.length() != 1) {
-                               throw new IllegalArgumentException("Invalid 
configuration for CsvInputFormat: " +
-                                               "Field delimiter must be a 
single character");
-                       } else {
-                               setFieldDelimiter(fieldDelimStr.charAt(0));
-                       }
+                       setFieldDelimiter(fieldDelimStr);
                }
                
                // read number of field configured via configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index c335db1..906e6d9 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -69,7 +69,7 @@ public class CsvInputFormatTest {
                        final FileInputSplit split = 
createTempFile(fileContent);
                        
                        CsvInputFormat<Tuple3<String, Integer, Double>> format 
= 
-                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", '|',  String.class, Integer.class, Double.class);
+                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", "|",  String.class, Integer.class, Double.class);
                        format.setLenient(true);
                
                        final Configuration parameters = new Configuration();
@@ -117,7 +117,7 @@ public class CsvInputFormatTest {
                        final FileInputSplit split = 
createTempFile(fileContent);
                        
                        CsvInputFormat<Tuple3<String, Integer, Double>> format 
= 
-                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
                        format.setCommentPrefix("#");
                
                        final Configuration parameters = new Configuration();
@@ -161,7 +161,7 @@ public class CsvInputFormatTest {
                        final FileInputSplit split = 
createTempFile(fileContent);
                        
                        CsvInputFormat<Tuple3<String, Integer, Double>> format 
= 
-                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+                                       new CsvInputFormat<Tuple3<String, 
Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
                        format.setCommentPrefix("//");
                
                        final Configuration parameters = new Configuration();
@@ -197,7 +197,7 @@ public class CsvInputFormatTest {
                        final String fileContent = 
"abc|def|ghijk\nabc||hhg\n|||";
                        final FileInputSplit split = 
createTempFile(fileContent);
                        
-                       final CsvInputFormat<Tuple3<String, String, String>> 
format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", '|', 
String.class, String.class, String.class);
+                       final CsvInputFormat<Tuple3<String, String, String>> 
format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", 
String.class, String.class, String.class);
                
                        final Configuration parameters = new Configuration();
                        format.configure(parameters);
@@ -236,12 +236,12 @@ public class CsvInputFormatTest {
        @Test
        public void readStringFieldsWithTrailingDelimiters() {
                try {
-                       final String fileContent = 
"abc|def|ghijk\nabc||hhg\n|||\n";
+                       final String fileContent = 
"abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
                        final FileInputSplit split = 
createTempFile(fileContent);
                
                        final CsvInputFormat<Tuple3<String, String, String>> 
format = new CsvInputFormat<Tuple3<String, String, String>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|-");
                        format.setFieldTypes(String.class, String.class, 
String.class);
                        
                        format.configure(new Configuration());
@@ -284,7 +284,7 @@ public class CsvInputFormatTest {
                
                        final CsvInputFormat<Tuple5<Integer, Integer, Integer, 
Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, 
Integer, Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypes(Integer.class, Integer.class, 
Integer.class, Integer.class, Integer.class);
                        
                        format.configure(new Configuration());
@@ -325,7 +325,7 @@ public class CsvInputFormatTest {
                
                        final CsvInputFormat<Tuple2<Integer, Integer>> format = 
new CsvInputFormat<Tuple2<Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypes(Integer.class, Integer.class);
                        
                        format.configure(new Configuration());
@@ -356,12 +356,13 @@ public class CsvInputFormatTest {
        @Test
        public void testReadSparseWithNullFieldsForTypes() throws IOException {
                try {
-                       final String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
+                       final String fileContent = 
"111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
+                                       
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
                        final FileInputSplit split = 
createTempFile(fileContent);       
                        
                        final CsvInputFormat<Tuple3<Integer, Integer, Integer>> 
format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|x|");
                        format.setFieldTypes(Integer.class, null, null, 
Integer.class, null, null, null, Integer.class);
                        
                        format.configure(new Configuration());
@@ -398,7 +399,7 @@ public class CsvInputFormatTest {
                        
                        final CsvInputFormat<Tuple3<Integer, Integer, Integer>> 
format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        
                        format.setFields(new int[] {0, 3, 7}, new Class<?>[] 
{Integer.class, Integer.class, Integer.class});
                        
@@ -432,12 +433,13 @@ public class CsvInputFormatTest {
        @Test
        public void testReadSparseWithMask() throws IOException {
                try {
-                       final String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
+                       final String fileContent = 
"111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
+                                       
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
                        final FileInputSplit split = 
createTempFile(fileContent);       
                        
                        final CsvInputFormat<Tuple3<Integer, Integer, Integer>> 
format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("&&");
 
                        format.setFields(new boolean[] { true, false, false, 
true, false, false, false, true }, new Class<?>[] { Integer.class,
                                        Integer.class, Integer.class });
@@ -473,7 +475,7 @@ public class CsvInputFormatTest {
                try {
                        final CsvInputFormat<Tuple3<Integer, Integer, Integer>> 
format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        
                        try {
                                format.setFields(new int[] {8, 1, 3}, new 
Class<?>[] {Integer.class, Integer.class, Integer.class});
@@ -500,7 +502,7 @@ public class CsvInputFormatTest {
                for (Object[] failure : failures) {
                        String input = (String) failure[0];
 
-                       int result = stringParser.parseField(input.getBytes(), 
0, input.length(), '|', null);
+                       int result = stringParser.parseField(input.getBytes(), 
0, input.length(), new byte[]{'|'}, null);
 
                        assertThat(result, is(-1));
                        assertThat(stringParser.getErrorState(), 
is(failure[1]));

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
index ea35b5d..5ccfdd9 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
@@ -137,7 +137,7 @@ public class CsvInputFormatTest {
                        new CsvInputFormat.ConfigBuilder(null, parameters)
                                .field(StringValue.class, 
0).field(StringValue.class, 1).field(StringValue.class, 2);
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        
                        format.configure(parameters);
                        format.open(split);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
 
b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
index 1e05a31..4dae630 100644
--- 
a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ 
b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
@@ -189,28 +189,28 @@ public class TPCHQuery10 {
        
        private static DataSet<Tuple5<Integer, String, String, Integer, 
Double>> getCustomerDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(customerPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("11110100")
                                        .types(Integer.class, String.class, 
String.class, Integer.class, Double.class);
        }
        
        private static DataSet<Tuple3<Integer, Integer, String>> 
getOrdersDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(ordersPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("110010000")
                                        .types(Integer.class, Integer.class, 
String.class);
        }
 
        private static DataSet<Tuple4<Integer, Double, Double, String>> 
getLineitemDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(lineitemPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("1000011010000000")
                                        .types(Integer.class, Double.class, 
Double.class, String.class);
        }
        
        private static DataSet<Tuple2<Integer, String>> 
getNationsDataSet(ExecutionEnvironment env) {
                return env.readCsvFile(nationPath)
-                                       .fieldDelimiter('|')
+                                       .fieldDelimiter("|")
                                        .includeFields("1100")
                                        .types(Integer.class, String.class);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 391986d..ffd8398 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -196,7 +196,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * @param filePath The path of the file, as a URI (e.g., 
"file:///some/local/file" or
    *                 "hdfs://host:port/file/path").   * @param lineDelimiter
    * @param lineDelimiter The string that separates lines, defaults to newline.
-   * @param fieldDelimiter The char that separates individual fields, defaults 
to ','.
+   * @param fieldDelimiter The string that separates individual fields, 
defaults to ",".
    * @param ignoreFirstLine Whether the first line in the file should be 
ignored.
    * @param ignoreComments Lines that start with the given String are ignored, 
disabled by default.
    * @param lenient Whether the parser should silently ignore malformed lines.
@@ -206,7 +206,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def readCsvFile[T <: Product : ClassTag : TypeInformation](
       filePath: String,
       lineDelimiter: String = "\n",
-      fieldDelimiter: Char = ',',
+      fieldDelimiter: String = ",",
       ignoreFirstLine: Boolean = false,
       ignoreComments: String = null,
       lenient: Boolean = false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
index 9116db8..f0a1dd7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
@@ -72,7 +72,7 @@ public class ConnectedComponentsWithObjectMapITCase extends 
JavaProgramTestBase
                // read vertex and edge data
                DataSet<Tuple1<Long>> vertices = 
env.readCsvFile(verticesPath).types(Long.class);
                
-               DataSet<Tuple2<Long, Long>> edges = 
env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class)
+               DataSet<Tuple2<Long, Long>> edges = 
env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
                                                                                
                .flatMap(new UndirectEdge());
                                
                // assign the initial components (equal to the vertex id)

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 0821af5..44447d8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -64,13 +64,13 @@ public class KMeansForTest implements Program {
 
                // get input data
                DataSet<Point> points = env.readCsvFile(pointsPath)
-                               .fieldDelimiter('|')
+                               .fieldDelimiter("|")
                                .includeFields(true, true)
                                .types(Double.class, Double.class)
                                .map(new TuplePointConverter());
 
                DataSet<Centroid> centroids = env.readCsvFile(centersPath)
-                               .fieldDelimiter('|')
+                               .fieldDelimiter("|")
                                .includeFields(true, true, true)
                                .types(Integer.class, Double.class, 
Double.class)
                                .map(new TupleCentroidConverter());

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index b0f1e11..e6e1e21 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -129,7 +129,7 @@ class CsvInputFormatTest {
       val format = new ScalaCsvInputFormat[(String, String, String)](
         PATH, createTypeInformation[(String, String, String)])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       val parameters = new Configuration
       format.configure(parameters)
       format.open(split)
@@ -164,12 +164,12 @@ class CsvInputFormatTest {
   @Test
   def readStringFieldsWithTrailingDelimiters(): Unit = {
     try {
-      val fileContent = "abc|def|ghijk\nabc||hhg\n|||\n"
+      val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, String, String)](
         PATH, createTypeInformation[(String, String, String)])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|-")
       val parameters = new Configuration
       format.configure(parameters)
       format.open(split)
@@ -207,7 +207,7 @@ class CsvInputFormatTest {
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
         PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.configure(new Configuration)
       format.open(split)
       var result: (Int, Int, Int, Int, Int) = null
@@ -238,10 +238,11 @@ class CsvInputFormatTest {
   @Test
   def testReadFirstN(): Unit = {
     try {
-      val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
+      val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" +
+        "666|x|777|x|888|x|999|x|000|x|\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(Int, Int)](PATH, 
createTypeInformation[(Int, Int)])
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|x|")
       format.configure(new Configuration)
       format.open(split)
       var result: (Int, Int) = null
@@ -272,7 +273,7 @@ class CsvInputFormatTest {
       val format = new ScalaCsvInputFormat[(Int, Int, Int)](
         PATH,
         createTypeInformation[(Int, Int, Int)])
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.setFields(Array(0, 3, 7), Array(classOf[Integer], 
classOf[Integer], classOf[Integer]))
       format.configure(new Configuration)
       format.open(split)
@@ -303,7 +304,7 @@ class CsvInputFormatTest {
       val format = new ScalaCsvInputFormat[(Int, Int, Int)](
         PATH,
         createTypeInformation[(Int, Int, Int)])
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       try {
         format.setFields(Array(8, 1, 3), 
Array(classOf[Integer],classOf[Integer],classOf[Integer]))
         fail("Input sequence should have been rejected.")

Reply via email to