wisgood commented on a change in pull request #8621: [FLINK-12682][connectors] 
StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r311649039
 
 

 ##########
 File path: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 ##########
 @@ -41,5 +80,62 @@ public void testDuplicate() {
                writer.setSyncOnFlush(false);
                assertFalse(StreamWriterBaseComparator.equals(writer, other));
                assertFalse(StreamWriterBaseComparator.equals(writer, new 
StringWriter<>()));
+
+       }
+
+       @Test
+       public void testMultiRowdelimiters() throws IOException {
+               String rowDelimiter1 = "\n";
+               String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + 
"C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
+               Path testFile1 = new Path(outputDir + "/test01");
+               testRowdelimiter(rowDelimiter1, testDat1, 
StandardCharsets.UTF_8.name(), testFile1);
+
+               String rowDelimiter2 = "\r\n";
+               String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + 
"C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
+               Path testFile2 = new Path(outputDir + "/test02");
+               testRowdelimiter(rowDelimiter2, testDat2, 
StandardCharsets.UTF_8.name(), testFile2);
+
+               String rowDelimiter3 = "*";
+               String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + 
"C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
+               Path testFile3 = new Path(outputDir + "/test03");
+               testRowdelimiter(rowDelimiter3, testDat3, 
StandardCharsets.UTF_8.name(), testFile3);
+
+               String rowDelimiter4 = "##";
+               String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + 
"C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
+               Path testFile4 = new Path(outputDir + "/test04");
+               testRowdelimiter(rowDelimiter4, testDat4, 
StandardCharsets.UTF_8.name(), testFile4);
+
+       }
+
+       private void testRowdelimiter(String rowDelimiter, String inputData, 
String charset, Path outputFile) throws IOException {
+               StringWriter<String> writer = new StringWriter(charset, 
rowDelimiter);
+               writer.open(dfs, outputFile);
+               StringTokenizer lineTokenizer = new StringTokenizer(inputData, 
rowDelimiter);
+               while (lineTokenizer.hasMoreTokens()){
+                       writer.write(lineTokenizer.nextToken());
+               }
+               writer.close();
+               FSDataInputStream inStream = dfs.open(outputFile);
+               byte[] buffer = new byte[inputData.getBytes(charset).length];
+               readFully(inStream, buffer);
+               inStream.close();
+               String outputData = new String(buffer, charset);
+               Assert.assertEquals(inputData, outputData);
+
+       }
+
+       private void readFully(InputStream in, byte[] buffer) throws 
IOException {
 
 Review comment:
   LGTM!
   
   > I've fixed couple of things myself. Could you check if you have any 
comments/issues regarding my fixup commit at the top: 
https://github.com/pnowojski/flink/commits/f12682 ?
   
   LGTM!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to