[
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998947#comment-14998947
]
ASF GitHub Bot commented on FLINK-2692:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/1266#discussion_r44435810
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
package org.apache.flink.api.java.io;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils;
-public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+public abstract class CsvInputFormat<OUT> extends
GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ protected transient Object[] parsedValues;
- public CsvInputFormat(Path filePath, CompositeType<OUT>
typeInformation) {
- super(filePath, typeInformation);
+ protected CsvInputFormat(Path filePath) {
+ super(filePath);
}
-
- public CsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, CompositeType<OUT> typeInformation) {
- super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ @SuppressWarnings("unchecked")
+ FieldParser<Object>[] fieldParsers = (FieldParser<Object>[])
getFieldParsers();
+
+ //throw exception if no field parsers are available
+ if (fieldParsers.length == 0) {
+ throw new
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to
parse input");
+ }
+
+ // create the value holders
+ this.parsedValues = new Object[fieldParsers.length];
+ for (int i = 0; i < fieldParsers.length; i++) {
+ this.parsedValues[i] = fieldParsers[i].createValue();
+ }
+
+ // left to right evaluation makes access [0] okay
+ // this marker is used to fasten up readRecord, so that it
doesn't have to check each call if the line ending is set to default
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0]
== '\n' ) {
+ this.lineDelimiterIsLinebreak = true;
+ }
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
}
@Override
- protected OUT createTuple(OUT reuse) {
- Tuple result = (Tuple) reuse;
- for (int i = 0; i < parsedValues.length; i++) {
- result.setField(parsedValues[i], i);
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
+ }
+
+ public Class<?>[] getFieldTypes() {
+ return super.getGenericFieldTypes();
+ }
+
+ protected static boolean[] createDefaultMask(int size) {
--- End diff --
*cover it in an obvious manner
> Untangle CsvInputFormat into PojoTypeCsvInputFormat and
> TupleTypeCsvInputFormat
> --------------------------------------------------------------------------------
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Chesnay Schepler
> Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a
> {{Pojo}} type. As a consequence, the processing logic, which has to work for
> both types, is overly complex. For example, the {{CsvInputFormat}} contains
> fields which are only used when a Pojo is returned. Moreover, the pojo field
> information are constructed by calling setter methods which have to be called
> in a very specific order, otherwise they fail. E.g. one first has to call
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the
> number of fields might be different. Furthermore, some of the methods can
> only be called if the return type is a {{Pojo}} type, because they expect
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more
> easily maintainable. I propose to split it up into a
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all
> the required information via their constructors instead of using the
> {{setFields}} and {{setOrderOfPOJOFields}} approach.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)