Repository: flink Updated Branches: refs/heads/master 06b2acf79 -> 0548a93df
http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 5ccabf5..56b1dd0 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -44,15 +44,15 @@ public class VarLengthStringParserTest { StringValue s = new StringValue(); int startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 9); assertTrue(s.getValue().equals("abcdefgh")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 11); assertTrue(s.getValue().equals("i")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 18); assertTrue(s.getValue().equals("jklmno")); @@ -60,18 +60,18 @@ public class VarLengthStringParserTest { // check single field not terminated recBytes = "abcde".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 5); assertTrue(s.getValue().equals("abcde")); // check last field not terminated recBytes = "abcde|fg".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 6); assertTrue(s.getValue().equals("abcde")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 8); assertTrue(s.getValue().equals("fg")); } @@ -84,15 +84,15 @@ public class VarLengthStringParserTest { StringValue s = new StringValue(); int startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 11); assertTrue(s.getValue().equals("abcdefgh")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 15); assertTrue(s.getValue().equals("i")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 24); assertTrue(s.getValue().equals("jklmno")); @@ -100,40 +100,40 @@ public class VarLengthStringParserTest { // check single field not terminated recBytes = "\"abcde\"".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 7); assertTrue(s.getValue().equals("abcde")); // check last field not terminated recBytes = "\"abcde\"|\"fg\"".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 8); assertTrue(s.getValue().equals("abcde")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 12); assertTrue(s.getValue().equals("fg")); // check delimiter in quotes recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 11); assertTrue(s.getValue().equals("abcde|fg")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 26); assertTrue(s.getValue().equals("hij|kl|mn|op")); // check delimiter in quotes last field not terminated recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 11); assertTrue(s.getValue().equals("abcde|fg")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 25); assertTrue(s.getValue().equals("hij|kl|mn|op")); } @@ -146,15 +146,15 @@ public class VarLengthStringParserTest { StringValue s = new StringValue(); int startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 13); assertTrue(s.getValue().equals("abcdefgh")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 25); assertTrue(s.getValue().equals("i")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 39); assertTrue(s.getValue().equals("jklmno")); @@ -163,29 +163,29 @@ public class VarLengthStringParserTest { s = new StringValue(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 13); assertTrue(s.getValue().equals("abcdefgh")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 25); assertTrue(s.getValue().equals("i")); - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 38); assertTrue(s.getValue().equals("jklmno")); // check single field not terminated recBytes = " \t\"abcde\"\t\t \t ".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 16); assertTrue(s.getValue().equals("abcde")); // check single field terminated recBytes = " \t\"abcde\"\t\t \t |".getBytes(); startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos == 17); assertTrue(s.getValue().equals("abcde")); } @@ -198,7 +198,7 @@ public class VarLengthStringParserTest { StringValue s = new StringValue(); int startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, '|', s); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos < 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 5f49bd3..73733fd 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -313,7 +313,7 @@ public class KMeans { if(fileOutput) { // read points from CSV file return env.readCsvFile(pointsPath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .includeFields(true, true) .types(Double.class, Double.class) .map(new TuplePointConverter()); @@ -325,7 +325,7 @@ public class KMeans { private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) { if(fileOutput) { return env.readCsvFile(centersPath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .includeFields(true, true, true) .types(Integer.class, Double.class, Double.class) .map(new TupleCentroidConverter()); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java index 1519a35..cb99e23 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -234,7 +234,7 @@ public class ConnectedComponents implements ProgramDescription { private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { if(fileOutput) { - return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class); + return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class); } else { return ConnectedComponentsData.getDefaultEdgeDataSet(env); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java index 984b5a5..2b65ad6 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java @@ -218,7 +218,7 @@ public class EnumTrianglesBasic { private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) { if(fileOutput) { return env.readCsvFile(edgePath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .includeFields(true, true) .types(Integer.class, Integer.class) .map(new TupleEdgeConverter()); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java index afbca1e..3f05a2b 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java @@ -342,7 +342,7 @@ public class EnumTrianglesOpt { private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) { if(fileOutput) { return env.readCsvFile(edgePath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .includeFields(true, true) .types(Integer.class, Integer.class) .map(new TupleEdgeConverter()); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java index 83cecc2..4e4d9ad 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java @@ -263,7 +263,7 @@ public class PageRankBasic { if(fileOutput) { return env .readCsvFile(pagesInputPath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class) .map(new MapFunction<Tuple1<Long>, Long>() { @@ -278,7 +278,7 @@ public class PageRankBasic { private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) { if(fileOutput) { return env.readCsvFile(linksInputPath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class, Long.class); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java index f532a95..abe96c8 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java @@ -147,7 +147,7 @@ public class TransitiveClosureNaive implements ProgramDescription { private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { if(fileOutput) { - return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class); + return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class); } else { return ConnectedComponentsData.getDefaultEdgeDataSet(env); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java index 1ee2b80..9694647 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java @@ -297,7 +297,7 @@ public class LinearRegression { if(fileOutput) { // read data from CSV file return env.readCsvFile(dataPath) - .fieldDelimiter(' ') + .fieldDelimiter(" ") .includeFields(true, true) .types(Double.class, Double.class) .map(new TupleDataConverter()); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java index 4192910..09b88fa 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java @@ -125,7 +125,7 @@ public class EmptyFieldsCountAccumulator { } else { source = env .readCsvFile(filePath) - .fieldDelimiter(';') + .fieldDelimiter(";") .types(String.class, String.class, String.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java index 436a3d4..3dc0472 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java @@ -205,28 +205,28 @@ public class TPCHQuery10 { private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) { return env.readCsvFile(customerPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("11110100") .types(Integer.class, String.class, String.class, Integer.class, Double.class); } private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) { return env.readCsvFile(ordersPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("110010000") .types(Integer.class, Integer.class, String.class); } private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) { return env.readCsvFile(lineitemPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("1000011010000000") .types(Integer.class, Double.class, Double.class, String.class); } private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) { return env.readCsvFile(nationPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("1100") .types(Integer.class, String.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java index 46161b4..9a6e58c 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java @@ -250,21 +250,21 @@ public class TPCHQuery3 { private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) { return env.readCsvFile(lineitemPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("1000011000100000") .tupleType(Lineitem.class); } private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) { return env.readCsvFile(customerPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("10000010") .tupleType(Customer.class); } private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) { return env.readCsvFile(ordersPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("110010010") .tupleType(Order.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java index 22b00f6..44f134a 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java @@ -294,7 +294,7 @@ public class WebLogAnalysis { // Create DataSet for documents relation (URL, Doc-Text) if(fileOutput) { return env.readCsvFile(documentsPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .types(String.class, String.class); } else { return WebLogData.getDocumentDataSet(env); @@ -305,7 +305,7 @@ public class WebLogAnalysis { // Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration) if(fileOutput) { return env.readCsvFile(ranksPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .types(Integer.class, String.class, Integer.class); } else { return WebLogData.getRankDataSet(env); @@ -316,7 +316,7 @@ public class WebLogAnalysis { // Create DataSet for visits relation (URL, Date) if(fileOutput) { return env.readCsvFile(visitsPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("011000000") .types(String.class, String.class); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 9e1ad7d..269ba7f 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -135,7 +135,7 @@ object KMeans { if (fileOutput) { env.readCsvFile[(Double, Double)]( pointsPath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1)) .map { x => new Point(x._1, x._2)} } @@ -151,7 +151,7 @@ object KMeans { if (fileOutput) { env.readCsvFile[(Int, Double, Double)]( centersPath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1, 2)) .map { x => new Centroid(x._1, x._2, x._3)} } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index 5a37a48..dfebfe9 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -143,7 +143,7 @@ object ConnectedComponents { if (fileOutput) { env.readCsvFile[(Long, Long)]( edgesPath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1)) .map { x => (x._1, x._2)} } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index d4e8920..6ebacee 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -160,7 +160,7 @@ object EnumTrianglesBasic { private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { if (fileOutput) { - env.readCsvFile[Edge](edgePath, fieldDelimiter = ' ', includedFields = Array(0, 1)) + env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1)) } else { val edges = EnumTrianglesData.EDGES.map { case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index 274cf9a..bde1bf3 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -229,7 +229,7 @@ object EnumTrianglesOpt { if (fileOutput) { env.readCsvFile[Edge]( edgePath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1)) } else { val edges = EnumTrianglesData.EDGES.map { http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index 8f0ac0d..1d70ae9 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -179,7 +179,7 @@ object PageRankBasic { private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { if (fileOutput) { - env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = ' ', lineDelimiter = "\n") + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") .map(x => x._1) } else { env.generateSequence(1, 15) @@ -188,7 +188,7 @@ object PageRankBasic { private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { if (fileOutput) { - env.readCsvFile[Link](linksInputPath, fieldDelimiter = ' ', + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", includedFields = Array(0, 1)) } else { val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 3bae2c0..b281b98 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -101,7 +101,7 @@ object TransitiveClosureNaive { if (fileOutput) { env.readCsvFile[(Long, Long)]( edgesPath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1)) .map { x => (x._1, x._2)} } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala index 86526a6..ea96e7c 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -172,7 +172,7 @@ object LinearRegression { if (fileOutput) { env.readCsvFile[(Double, Double)]( dataPath, - fieldDelimiter = ' ', + fieldDelimiter = " ", includedFields = Array(0, 1)) .map { t => new Data(t._1, t._2) } } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala index b39a2dd..1b1d5c5 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala @@ -155,14 +155,14 @@ object TPCHQuery10 { DataSet[Tuple5[Int, String, String, Int, Double]] = { env.readCsvFile[Tuple5[Int, String, String, Int, Double]]( customerPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0,1,2,3,5) ) } private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = { env.readCsvFile[Tuple3[Int, Int, String]]( ordersPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 1, 4) ) } @@ -170,14 +170,14 @@ object TPCHQuery10 { DataSet[Tuple4[Int, Double, Double, String]] = { env.readCsvFile[Tuple4[Int, Double, Double, String]]( lineitemPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 5, 6, 8) ) } private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = { env.readCsvFile[Tuple2[Int, String]]( nationPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 1) ) } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala index 6b8b0fd..0e0ecd3 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala @@ -151,21 +151,21 @@ object TPCHQuery3 { private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { env.readCsvFile[Lineitem]( lineitemPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 5, 6, 10) ) } private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { env.readCsvFile[Customer]( customerPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 6) ) } private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { env.readCsvFile[Order]( ordersPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 1, 4, 7) ) } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala index 1ab5c01..38b6415 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala @@ -167,7 +167,7 @@ object WebLogAnalysis { if (fileOutput) { env.readCsvFile[(String, String)]( documentsPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 1)) } else { @@ -182,7 +182,7 @@ object WebLogAnalysis { if (fileOutput) { env.readCsvFile[(Int, String, Int)]( ranksPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(0, 1, 2)) } else { @@ -197,7 +197,7 @@ object WebLogAnalysis { if (fileOutput) { env.readCsvFile[(String, String)]( visitsPath, - fieldDelimiter = '|', + fieldDelimiter = "|", includedFields = Array(1, 2)) } else { http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index fe2ae14..7669c39 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -50,7 +50,7 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT public static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final char DEFAULT_FIELD_DELIMITER = ','; + public static final String DEFAULT_FIELD_DELIMITER = ","; private transient Object[] parsedValues; @@ -74,7 +74,7 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); } - public CsvInputFormat(Path filePath, String lineDelimiter, char fieldDelimiter, Class<?>... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) { super(filePath); setDelimiter(lineDelimiter); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index f4c9e32..b31c9a4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -49,7 +49,7 @@ public class CsvReader { protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER; - protected char fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER; + protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER; protected String commentPrefix = null; //default: no comments @@ -100,7 +100,20 @@ public class CsvReader { * @param delimiter The delimiter that separates the fields in one row. * @return The CSV reader instance itself, to allow for fluent function chaining. */ + @Deprecated public CsvReader fieldDelimiter(char delimiter) { + this.fieldDelimiter = String.valueOf(delimiter); + return this; + } + + /** + * Configures the delimiter that separates the fields within a row. The comma character + * ({@code ','}) is used by default. + * + * @param delimiter The delimiter that separates the fields in one row. + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + public CsvReader fieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java index e2f9f35..6a22767 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java @@ -73,7 +73,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> { } // Null character as delimiter is used because there's only 1 field to be parsed - if (parser.parseField(bytes, offset, numBytes + offset, '\0', reuse) >= 0) { + if (parser.parseField(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) { return parser.getLastResult(); } else { String s = new String(bytes, offset, numBytes); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java index 3e06fb5..5f83a19 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java @@ -142,12 +142,7 @@ public class CsvInputFormat extends GenericCsvInputFormat<Record> { final String fieldDelimStr = config.getString(FIELD_DELIMITER_PARAMETER, null); if (fieldDelimStr != null) { - if (fieldDelimStr.length() != 1) { - throw new IllegalArgumentException("Invalid configuration for CsvInputFormat: " + - "Field delimiter must be a single character"); - } else { - setFieldDelimiter(fieldDelimStr.charAt(0)); - } + setFieldDelimiter(fieldDelimStr); } // read number of field configured via configuration http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index c335db1..906e6d9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -69,7 +69,7 @@ public class CsvInputFormatTest { final FileInputSplit split = createTempFile(fileContent); CsvInputFormat<Tuple3<String, Integer, Double>> format = - new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class); + new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class); format.setLenient(true); final Configuration parameters = new Configuration(); @@ -117,7 +117,7 @@ public class CsvInputFormatTest { final FileInputSplit split = createTempFile(fileContent); CsvInputFormat<Tuple3<String, Integer, Double>> format = - new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class); + new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class); format.setCommentPrefix("#"); final Configuration parameters = new Configuration(); @@ -161,7 +161,7 @@ public class CsvInputFormatTest { final FileInputSplit split = createTempFile(fileContent); CsvInputFormat<Tuple3<String, Integer, Double>> format = - new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class); + new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class); format.setCommentPrefix("//"); final Configuration parameters = new Configuration(); @@ -197,7 +197,7 @@ public class CsvInputFormatTest { final String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; final FileInputSplit split = createTempFile(fileContent); - final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", '|', String.class, String.class, String.class); + final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class); final Configuration parameters = new Configuration(); format.configure(parameters); @@ -236,12 +236,12 @@ public class CsvInputFormatTest { @Test public void readStringFieldsWithTrailingDelimiters() { try { - final String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n"; + final String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"; final FileInputSplit split = createTempFile(fileContent); final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|-"); format.setFieldTypes(String.class, String.class, String.class); format.configure(new Configuration()); @@ -284,7 +284,7 @@ public class CsvInputFormatTest { final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypes(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class); format.configure(new Configuration()); @@ -325,7 +325,7 @@ public class CsvInputFormatTest { final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypes(Integer.class, Integer.class); format.configure(new Configuration()); @@ -356,12 +356,13 @@ public class CsvInputFormatTest { @Test public void testReadSparseWithNullFieldsForTypes() throws IOException { try { - final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; + final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + + "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"; final FileInputSplit split = createTempFile(fileContent); final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|x|"); format.setFieldTypes(Integer.class, null, null, Integer.class, null, null, null, Integer.class); format.configure(new Configuration()); @@ -398,7 +399,7 @@ public class CsvInputFormatTest { final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFields(new int[] {0, 3, 7}, new Class<?>[] {Integer.class, Integer.class, Integer.class}); @@ -432,12 +433,13 @@ public class CsvInputFormatTest { @Test public void testReadSparseWithMask() throws IOException { try { - final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; + final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + + "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"; final FileInputSplit split = createTempFile(fileContent); final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("&&"); format.setFields(new boolean[] { true, false, false, true, false, false, false, true }, new Class<?>[] { Integer.class, Integer.class, Integer.class }); @@ -473,7 +475,7 @@ public class CsvInputFormatTest { try { final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); try { format.setFields(new int[] {8, 1, 3}, new Class<?>[] {Integer.class, Integer.class, Integer.class}); @@ -500,7 +502,7 @@ public class CsvInputFormatTest { for (Object[] failure : failures) { String input = (String) failure[0]; - int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null); + int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null); assertThat(result, is(-1)); assertThat(stringParser.getErrorState(), is(failure[1])); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java index ea35b5d..5ccfdd9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java @@ -137,7 +137,7 @@ public class CsvInputFormatTest { new CsvInputFormat.ConfigBuilder(null, parameters) .field(StringValue.class, 0).field(StringValue.class, 1).field(StringValue.class, 2); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.configure(parameters); format.open(split); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java index 1e05a31..4dae630 100644 --- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java +++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java @@ -189,28 +189,28 @@ public class TPCHQuery10 { private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) { return env.readCsvFile(customerPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("11110100") .types(Integer.class, String.class, String.class, Integer.class, Double.class); } private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) { return env.readCsvFile(ordersPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("110010000") .types(Integer.class, Integer.class, String.class); } private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) { return env.readCsvFile(lineitemPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("1000011010000000") .types(Integer.class, Double.class, Double.class, String.class); } private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) { return env.readCsvFile(nationPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields("1100") .types(Integer.class, String.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 391986d..ffd8398 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -196,7 +196,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or * "hdfs://host:port/file/path"). * @param lineDelimiter * @param lineDelimiter The string that separates lines, defaults to newline. - * @param fieldDelimiter The char that separates individual fields, defaults to ','. + * @param fieldDelimiter The string that separates individual fields, defaults to ",". * @param ignoreFirstLine Whether the first line in the file should be ignored. * @param ignoreComments Lines that start with the given String are ignored, disabled by default. * @param lenient Whether the parser should silently ignore malformed lines. @@ -206,7 +206,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { def readCsvFile[T <: Product : ClassTag : TypeInformation]( filePath: String, lineDelimiter: String = "\n", - fieldDelimiter: Char = ',', + fieldDelimiter: String = ",", ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false, http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java index 9116db8..f0a1dd7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java @@ -72,7 +72,7 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase // read vertex and edge data DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class); - DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class) + DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class) .flatMap(new UndirectEdge()); // assign the initial components (equal to the vertex id) http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java index 0821af5..44447d8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java @@ -64,13 +64,13 @@ public class KMeansForTest implements Program { // get input data DataSet<Point> points = env.readCsvFile(pointsPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields(true, true) .types(Double.class, Double.class) .map(new TuplePointConverter()); DataSet<Centroid> centroids = env.readCsvFile(centersPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .includeFields(true, true, true) .types(Integer.class, Double.class, Double.class) .map(new TupleCentroidConverter()); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index b0f1e11..e6e1e21 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -129,7 +129,7 @@ class CsvInputFormatTest { val format = new ScalaCsvInputFormat[(String, String, String)]( PATH, createTypeInformation[(String, String, String)]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") val parameters = new Configuration format.configure(parameters) format.open(split) @@ -164,12 +164,12 @@ class CsvInputFormatTest { @Test def readStringFieldsWithTrailingDelimiters(): Unit = { try { - val fileContent = "abc|def|ghijk\nabc||hhg\n|||\n" + val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n" val split = createTempFile(fileContent) val format = new ScalaCsvInputFormat[(String, String, String)]( PATH, createTypeInformation[(String, String, String)]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|-") val parameters = new Configuration format.configure(parameters) format.open(split) @@ -207,7 +207,7 @@ class CsvInputFormatTest { val split = createTempFile(fileContent) val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)]( PATH, createTypeInformation[(Int, Int, Int, Int, Int)]) - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.configure(new Configuration) format.open(split) var result: (Int, Int, Int, Int, Int) = null @@ -238,10 +238,11 @@ class CsvInputFormatTest { @Test def testReadFirstN(): Unit = { try { - val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n" + val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" + + "666|x|777|x|888|x|999|x|000|x|\n" val split = createTempFile(fileContent) val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)]) - format.setFieldDelimiter('|') + format.setFieldDelimiter("|x|") format.configure(new Configuration) format.open(split) var result: (Int, Int) = null @@ -272,7 +273,7 @@ class CsvInputFormatTest { val format = new ScalaCsvInputFormat[(Int, Int, Int)]( PATH, createTypeInformation[(Int, Int, Int)]) - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.setFields(Array(0, 3, 7), Array(classOf[Integer], classOf[Integer], classOf[Integer])) format.configure(new Configuration) format.open(split) @@ -303,7 +304,7 @@ class CsvInputFormatTest { val format = new ScalaCsvInputFormat[(Int, Int, Int)]( PATH, createTypeInformation[(Int, Int, Int)]) - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") try { format.setFields(Array(8, 1, 3), Array(classOf[Integer],classOf[Integer],classOf[Integer])) fail("Input sequence should have been rejected.")
