afedulov commented on a change in pull request #17598:
URL: https://github.com/apache/flink/pull/17598#discussion_r778892987
##########
File path:
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -82,41 +102,86 @@ public void testBoundedTextFileSourceWithJackson() throws
Exception {
final File testDir = TMP_FOLDER.newFolder();
writeFile(testDir, "data.csv", CSV_LINES);
- final CsvReaderFormat<CitiesPojo> csvFormat =
CsvReaderFormat.forPojo(CitiesPojo.class);
- final List<CitiesPojo> result = initializeSourceAndReadData(testDir,
csvFormat);
-
- final CitiesPojo[] expected =
- new CitiesPojo[] {
- new CitiesPojo(
- "Berlin",
- new BigDecimal("52.5167"),
- new BigDecimal("13.3833"),
- "Germany",
- "DE",
- "Berlin",
- "primary",
- 3644826L),
- new CitiesPojo(
- "San Francisco",
- new BigDecimal("37.7562"),
- new BigDecimal("-122.4430"),
- "United States",
- "US",
- "California",
- "",
- 3592294L),
- new CitiesPojo(
- "Beijing",
- new BigDecimal("39.9050"),
- new BigDecimal("116.3914"),
- "China",
- "CN",
- "Beijing",
- "primary",
- 19433000L)
- };
-
- assertEquals(Arrays.asList(expected), result);
+ final CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forPojo(CityPojo.class);
+ final List<CityPojo> result = initializeSourceAndReadData(testDir,
csvFormat);
+
+ assertEquals(Arrays.asList(pojos), result);
+ }
+
+ @Test
+ public void testCustomBulkWriter() throws Exception {
+ final File outDir = TMP_FOLDER.newFolder();
+
+ StreamingFileSink<CityPojo> sink =
+ StreamingFileSink.forBulkFormat(
+ new Path(outDir.toURI()), new
TestCsvBulkWriterFactory())
+ .withBucketAssigner(new BasePathBucketAssigner<>())
+ .withBucketCheckInterval(10L)
+ .withRollingPolicy(build())
+ .build();
+
+ OneInputStreamOperatorTestHarness<CityPojo, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(sink), 1, 1, 0);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(pojos[0], 1L));
+ testHarness.processElement(new StreamRecord<>(pojos[1], 1L));
+ testHarness.processElement(new StreamRecord<>(pojos[2], 1L));
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(2L);
+
+ Map<File, String> contents = getFileContentByPath(outDir);
+ Map.Entry<File, String> onlyEntry =
contents.entrySet().iterator().next();
+ String[] result = onlyEntry.getValue().split("\n");
+
+ assertThat(result, is(CSV_LINES));
+ // assertThat(result, arrayContaining(CSV_LINES));
+ }
+
+ static Map<File, String> getFileContentByPath(File directory) throws
IOException {
+ Map<File, String> contents = new HashMap<>(4);
+
+ final Collection<File> filesInBucket = FileUtils.listFiles(directory,
null, true);
+ for (File file : filesInBucket) {
+ contents.put(file, FileUtils.readFileToString(file));
+ }
+ return contents;
+ }
+
+ static final class TestCsvBulkWriterFactory implements
BulkWriter.Factory<CityPojo> {
Review comment:
Added native POJOs support in CsvBulkWriter.
##########
File path:
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -82,41 +102,86 @@ public void testBoundedTextFileSourceWithJackson() throws
Exception {
final File testDir = TMP_FOLDER.newFolder();
writeFile(testDir, "data.csv", CSV_LINES);
- final CsvReaderFormat<CitiesPojo> csvFormat =
CsvReaderFormat.forPojo(CitiesPojo.class);
- final List<CitiesPojo> result = initializeSourceAndReadData(testDir,
csvFormat);
-
- final CitiesPojo[] expected =
- new CitiesPojo[] {
- new CitiesPojo(
- "Berlin",
- new BigDecimal("52.5167"),
- new BigDecimal("13.3833"),
- "Germany",
- "DE",
- "Berlin",
- "primary",
- 3644826L),
- new CitiesPojo(
- "San Francisco",
- new BigDecimal("37.7562"),
- new BigDecimal("-122.4430"),
- "United States",
- "US",
- "California",
- "",
- 3592294L),
- new CitiesPojo(
- "Beijing",
- new BigDecimal("39.9050"),
- new BigDecimal("116.3914"),
- "China",
- "CN",
- "Beijing",
- "primary",
- 19433000L)
- };
-
- assertEquals(Arrays.asList(expected), result);
+ final CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forPojo(CityPojo.class);
+ final List<CityPojo> result = initializeSourceAndReadData(testDir,
csvFormat);
+
+ assertEquals(Arrays.asList(pojos), result);
+ }
+
+ @Test
+ public void testCustomBulkWriter() throws Exception {
+ final File outDir = TMP_FOLDER.newFolder();
+
+ StreamingFileSink<CityPojo> sink =
+ StreamingFileSink.forBulkFormat(
+ new Path(outDir.toURI()), new
TestCsvBulkWriterFactory())
+ .withBucketAssigner(new BasePathBucketAssigner<>())
+ .withBucketCheckInterval(10L)
+ .withRollingPolicy(build())
+ .build();
+
+ OneInputStreamOperatorTestHarness<CityPojo, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(sink), 1, 1, 0);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(pojos[0], 1L));
+ testHarness.processElement(new StreamRecord<>(pojos[1], 1L));
+ testHarness.processElement(new StreamRecord<>(pojos[2], 1L));
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(2L);
+
+ Map<File, String> contents = getFileContentByPath(outDir);
+ Map.Entry<File, String> onlyEntry =
contents.entrySet().iterator().next();
+ String[] result = onlyEntry.getValue().split("\n");
+
+ assertThat(result, is(CSV_LINES));
+ // assertThat(result, arrayContaining(CSV_LINES));
+ }
+
+ static Map<File, String> getFileContentByPath(File directory) throws
IOException {
+ Map<File, String> contents = new HashMap<>(4);
+
+ final Collection<File> filesInBucket = FileUtils.listFiles(directory,
null, true);
+ for (File file : filesInBucket) {
+ contents.put(file, FileUtils.readFileToString(file));
+ }
+ return contents;
+ }
+
+ static final class TestCsvBulkWriterFactory implements
BulkWriter.Factory<CityPojo> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public BulkWriter<CityPojo> create(FSDataOutputStream out) {
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final CsvMapper csvMapper = new CsvMapper();
+ final CsvSchema schema =
csvMapper.schemaFor(CityPojo.class).withoutQuoteChar();
+
+ PojoConverter.ConverterContext converterContext =
+ new PojoConverter.ConverterContext(objectMapper);
+ final PojoConverter converter = createConverter();
+
+ return CsvBulkWriter.forSchema(csvMapper, schema, converter,
converterContext, out);
+ }
+ }
+
+ interface PojoConverter extends Converter<CityPojo, JsonNode,
PojoConverter.ConverterContext> {
+
+ class ConverterContext implements Serializable {
+ ObjectMapper mapper;
+
+ public ConverterContext(ObjectMapper csvMapper) {
+ this.mapper = csvMapper;
+ }
+ }
+ }
+
+ public static PojoConverter createConverter() {
+ return (pojo, context) -> context.mapper.valueToTree(pojo);
Review comment:
Added native POJOs support in CsvBulkWriter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]