fapaul commented on a change in pull request #17598:
URL: https://github.com/apache/flink/pull/17598#discussion_r792641970



##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
##########
@@ -109,17 +110,21 @@ public CsvBulkDecodingFormat(ReadableConfig 
formatOptions) {
             final RowType physicalRowType = (RowType) 
physicalDataType.getLogicalType();
             final CsvSchema schema = buildCsvSchema(physicalRowType, 
formatOptions);
 
+            final boolean ignoreParseErrors =
+                    formatOptions.getOptional(IGNORE_PARSE_ERRORS).isPresent();
             final Converter<JsonNode, RowData, Void> converter =
                     (Converter)
-                            new CsvToRowDataConverters(false)
+                            new CsvToRowDataConverters(ignoreParseErrors)
                                     .createRowConverter(projectedRowType, 
true);
-            return new StreamFormatAdapter<>(
+            CsvReaderFormat<RowData> csvReaderFormat =
                     new CsvReaderFormat<>(
                             new CsvMapper(),
                             schema,
                             JsonNode.class,
                             converter,
-                            context.createTypeInformation(projectedDataType)));
+                            context.createTypeInformation(projectedDataType));
+            csvReaderFormat.setIgnoreParseErrors(ignoreParseErrors);

Review comment:
       Nit: WDYT about passing the complete configuration to the `Reader` 
class? It feels more extensible in case we also need to pass other 
configurations to the `Reader`.
   
   The setter, in this case, is only invoked one time and it is similar to a 
builder method.

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
##########
@@ -155,16 +169,34 @@
     private static final class Reader<R, T> implements StreamFormat.Reader<T> {
         private final MappingIterator<R> iterator;
         private final Converter<R, T, Void> converter;
+        private final boolean ignoreParseErrors;
 
-        public Reader(MappingIterator<R> iterator, Converter<R, T, Void> 
converter) {
+        public Reader(
+                MappingIterator<R> iterator,
+                Converter<R, T, Void> converter,
+                boolean ignoreParseErrors) {
             this.iterator = checkNotNull(iterator);
             this.converter = checkNotNull(converter);
+            this.ignoreParseErrors = ignoreParseErrors;
         }
 
         @Nullable
         @Override
         public T read() throws IOException {
-            return iterator.hasNext() ? converter.convert(iterator.next(), 
null) : null;
+            while (true) {

Review comment:
       Why do you now use a while loop?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
##########
@@ -155,16 +169,34 @@
     private static final class Reader<R, T> implements StreamFormat.Reader<T> {
         private final MappingIterator<R> iterator;
         private final Converter<R, T, Void> converter;
+        private final boolean ignoreParseErrors;
 
-        public Reader(MappingIterator<R> iterator, Converter<R, T, Void> 
converter) {
+        public Reader(
+                MappingIterator<R> iterator,
+                Converter<R, T, Void> converter,
+                boolean ignoreParseErrors) {
             this.iterator = checkNotNull(iterator);
             this.converter = checkNotNull(converter);
+            this.ignoreParseErrors = ignoreParseErrors;
         }
 
         @Nullable
         @Override
         public T read() throws IOException {
-            return iterator.hasNext() ? converter.convert(iterator.next(), 
null) : null;
+            while (true) {
+                try {
+                    if (iterator.hasNext()) {
+                        R nextElement = iterator.next();
+                        return converter.convert(nextElement, null);
+                    } else {
+                        return null;
+                    }
+                } catch (Throwable t) {

Review comment:
       Is it necessary to catch all kinds of throwables here or is 
`IOException` enough? It can be dangerous to catch all kinds of exceptions i.e. 
if you handle the `InterruptedException` you also want to set the interrupted 
flag of the Thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to