[FLINK-2692] Untangle CsvInputFormat

This closes #1266


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd61f2db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd61f2db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd61f2db

Branch: refs/heads/master
Commit: bd61f2dbdf1a0215363ffa8416329e1dbf277593
Parents: fc6fec7
Author: zentol <ches...@apache.org>
Authored: Sun Oct 18 20:23:23 2015 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Nov 18 21:42:00 2015 +0100

----------------------------------------------------------------------
 .../wordcount/BoltTokenizerWordCountPojo.java   |   3 +-
 .../BoltTokenizerWordCountWithNames.java        |   3 +-
 .../flink/api/java/io/CommonCsvInputFormat.java | 258 -------------------
 .../flink/api/java/io/CsvInputFormat.java       | 127 ++++++++-
 .../org/apache/flink/api/java/io/CsvReader.java | 125 ++++-----
 .../flink/api/java/io/PojoCsvInputFormat.java   | 199 ++++++++++++++
 .../flink/api/java/io/TupleCsvInputFormat.java  |  90 +++++++
 .../flink/api/java/tuple/TupleGenerator.java    |  13 +-
 .../java/typeutils/runtime/TupleSerializer.java |   8 +
 .../typeutils/runtime/TupleSerializerBase.java  |   2 +
 .../flink/api/java/io/CsvInputFormatTest.java   |  96 ++-----
 .../flink/python/api/PythonPlanBinder.java      |  13 +-
 .../optimizer/ReplicatingDataSourceTest.java    |  26 +-
 .../scala/operators/ScalaCsvInputFormat.java    |  65 -----
 .../scala/operators/ScalaCsvOutputFormat.java   |   6 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  56 ++--
 .../scala/typeutils/CaseClassSerializer.scala   |   4 +
 .../flink/api/scala/io/CsvInputFormatTest.scala |  78 ++----
 18 files changed, 573 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index 9bdcead..e093714 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -20,6 +20,7 @@ package org.apache.flink.storm.wordcount;
 import backtype.storm.topology.IRichBolt;
 
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PojoCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -119,7 +120,7 @@ public class BoltTokenizerWordCountPojo {
                        // read the text file from given input path
                        PojoTypeInfo<Sentence> sourceType = 
(PojoTypeInfo<Sentence>) TypeExtractor
                                        .getForObject(new Sentence(""));
-                       return env.createInput(new CsvInputFormat<Sentence>(new 
Path(
+                       return env.createInput(new 
PojoCsvInputFormat<Sentence>(new Path(
                                        textPath), 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
                                        CsvInputFormat.DEFAULT_LINE_DELIMITER, 
sourceType),
                                        sourceType);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index 019f1bc..f5a1a35 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -21,6 +21,7 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -122,7 +123,7 @@ public class BoltTokenizerWordCountWithNames {
                        // read the text file from given input path
                        TupleTypeInfo<Tuple1<String>> sourceType = 
(TupleTypeInfo<Tuple1<String>>)TypeExtractor
                                        .getForObject(new Tuple1<String>(""));
-                       return env.createInput(new 
CsvInputFormat<Tuple1<String>>(new Path(
+                       return env.createInput(new 
TupleCsvInputFormat<Tuple1<String>>(new Path(
                                        textPath), 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
                                        CsvInputFormat.DEFAULT_LINE_DELIMITER, 
sourceType),
                                        sourceType);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
deleted file mode 100644
index 444d151..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class CommonCsvInputFormat<OUT> extends 
GenericCsvInputFormat<OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       public static final String DEFAULT_LINE_DELIMITER = "\n";
-
-       public static final String DEFAULT_FIELD_DELIMITER = ",";
-
-       protected transient Object[] parsedValues;
-
-       private final  Class<OUT> pojoTypeClass;
-
-       private String[] pojoFieldNames;
-
-       private transient PojoTypeInfo<OUT> pojoTypeInfo;
-       private transient Field[] pojoFields;
-
-       public CommonCsvInputFormat(Path filePath, CompositeType<OUT> 
typeInformation) {
-               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
-       }
-
-       public CommonCsvInputFormat(
-                       Path filePath,
-                       String lineDelimiter,
-                       String fieldDelimiter,
-                       CompositeType<OUT> compositeTypeInfo) {
-               super(filePath);
-
-               setDelimiter(lineDelimiter);
-               setFieldDelimiter(fieldDelimiter);
-
-               Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()];
-
-               for (int i = 0; i < compositeTypeInfo.getArity(); i++) {
-                       classes[i] = 
compositeTypeInfo.getTypeAt(i).getTypeClass();
-               }
-
-               setFieldTypes(classes);
-
-               if (compositeTypeInfo instanceof PojoTypeInfo) {
-                       pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo;
-
-                       pojoTypeClass = compositeTypeInfo.getTypeClass();
-                       setOrderOfPOJOFields(compositeTypeInfo.getFieldNames());
-               } else {
-                       pojoTypeClass = null;
-                       pojoFieldNames = null;
-               }
-       }
-
-       public void setOrderOfPOJOFields(String[] fieldNames) {
-               Preconditions.checkNotNull(pojoTypeClass, "Field order can only 
be specified if output type is a POJO.");
-               Preconditions.checkNotNull(fieldNames);
-
-               int includedCount = 0;
-               for (boolean isIncluded : fieldIncluded) {
-                       if (isIncluded) {
-                               includedCount++;
-                       }
-               }
-
-               Preconditions.checkArgument(includedCount == fieldNames.length, 
includedCount +
-                       " CSV fields and " + fieldNames.length + " POJO fields 
selected. The number of selected CSV and POJO fields must be equal.");
-
-               for (String field : fieldNames) {
-                       Preconditions.checkNotNull(field, "The field name 
cannot be null.");
-                       
Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
-                               "Field \""+ field + "\" is not a member of POJO 
class " + pojoTypeClass.getName());
-               }
-
-               pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, 
fieldNames.length);
-       }
-
-       public void setFieldTypes(Class<?>... fieldTypes) {
-               if (fieldTypes == null || fieldTypes.length == 0) {
-                       throw new IllegalArgumentException("Field types must 
not be null or empty.");
-               }
-
-               setFieldTypesGeneric(fieldTypes);
-       }
-
-       public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
-               Preconditions.checkNotNull(sourceFieldIndices);
-               Preconditions.checkNotNull(fieldTypes);
-
-               checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
-               setFieldsGeneric(sourceFieldIndices, fieldTypes);
-       }
-
-       public  void setFields(boolean[] sourceFieldMask, Class<?>[] 
fieldTypes) {
-               Preconditions.checkNotNull(sourceFieldMask);
-               Preconditions.checkNotNull(fieldTypes);
-
-               setFieldsGeneric(sourceFieldMask, fieldTypes);
-       }
-
-       public Class<?>[] getFieldTypes() {
-               return super.getGenericFieldTypes();
-       }
-
-       @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
-
-               @SuppressWarnings("unchecked")
-               FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) 
getFieldParsers();
-
-               //throw exception if no field parsers are available
-               if (fieldParsers.length == 0) {
-                       throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
-               }
-
-               // create the value holders
-               this.parsedValues = new Object[fieldParsers.length];
-               for (int i = 0; i < fieldParsers.length; i++) {
-                       this.parsedValues[i] = fieldParsers[i].createValue();
-               }
-
-               // left to right evaluation makes access [0] okay
-               // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
-               if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
-                       this.lineDelimiterIsLinebreak = true;
-               }
-
-               // for POJO type
-               if (pojoTypeClass != null) {
-                       pojoFields = new Field[pojoFieldNames.length];
-
-                       Map<String, Field> allFields = new HashMap<String, 
Field>();
-
-                       findAllFields(pojoTypeClass, allFields);
-
-                       for (int i = 0; i < pojoFieldNames.length; i++) {
-                               pojoFields[i] = 
allFields.get(pojoFieldNames[i]);
-
-                               if (pojoFields[i] != null) {
-                                       pojoFields[i].setAccessible(true);
-                               } else {
-                                       throw new RuntimeException("There is no 
field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
-                               }
-                       }
-               }
-
-               this.commentCount = 0;
-               this.invalidLineCount = 0;
-       }
-
-       /**
-        * Finds all declared fields in a class and all its super classes.
-        *
-        * @param clazz Class for which all declared fields are found
-        * @param allFields Map containing all found fields so far
-        */
-       private void findAllFields(Class<?> clazz, Map<String, Field> 
allFields) {
-               for (Field field: clazz.getDeclaredFields()) {
-                       allFields.put(field.getName(), field);
-               }
-
-               if (clazz.getSuperclass() != null) {
-                       findAllFields(clazz.getSuperclass(), allFields);
-               }
-       }
-
-       @Override
-       public OUT nextRecord(OUT record) throws IOException {
-               OUT returnRecord = null;
-               do {
-                       returnRecord = super.nextRecord(record);
-               } while (returnRecord == null && !reachedEnd());
-
-               return returnRecord;
-       }
-
-       @Override
-       public OUT readRecord(OUT reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
-               /*
-                * Fix to support windows line endings in CSVInputFiles with 
standard delimiter setup = \n
-                */
-               //Find windows end line, so find carriage return before the 
newline
-               if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && 
bytes[offset + numBytes -1] == '\r' ) {
-                       //reduce the number of bytes so that the Carriage 
return is not taken as data
-                       numBytes--;
-               }
-
-               if (commentPrefix != null && commentPrefix.length <= numBytes) {
-                       //check record for comments
-                       boolean isComment = true;
-                       for (int i = 0; i < commentPrefix.length; i++) {
-                               if (commentPrefix[i] != bytes[offset + i]) {
-                                       isComment = false;
-                                       break;
-                               }
-                       }
-                       if (isComment) {
-                               this.commentCount++;
-                               return null;
-                       }
-               }
-
-               if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-                       if (pojoTypeClass == null) {
-                               // result type is tuple
-                               return createTuple(reuse);
-                       } else {
-                               // result type is POJO
-                               for (int i = 0; i < parsedValues.length; i++) {
-                                       try {
-                                               pojoFields[i].set(reuse, 
parsedValues[i]);
-                                       } catch (IllegalAccessException e) {
-                                               throw new 
RuntimeException("Parsed value could not be set in POJO field \"" + 
pojoFieldNames[i] + "\"", e);
-                                       }
-                               }
-                               return reuse;
-                       }
-
-               } else {
-                       this.invalidLineCount++;
-                       return null;
-               }
-       }
-
-       protected abstract OUT createTuple(OUT reuse);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7d86f39..8f0aa64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -18,32 +18,133 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
        private static final long serialVersionUID = 1L;
+
+       public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+       public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+       protected transient Object[] parsedValues;
        
-       public CsvInputFormat(Path filePath, CompositeType<OUT> 
typeInformation) {
-               super(filePath, typeInformation);
+       protected CsvInputFormat(Path filePath) {
+               super(filePath);
        }
-       
-       public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType<OUT> typeInformation) {
-               super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+       @Override
+       public void open(FileInputSplit split) throws IOException {
+               super.open(split);
+
+               @SuppressWarnings("unchecked")
+               FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) 
getFieldParsers();
+
+               //throw exception if no field parsers are available
+               if (fieldParsers.length == 0) {
+                       throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+               }
+
+               // create the value holders
+               this.parsedValues = new Object[fieldParsers.length];
+               for (int i = 0; i < fieldParsers.length; i++) {
+                       this.parsedValues[i] = fieldParsers[i].createValue();
+               }
+
+               // left to right evaluation makes access [0] okay
+               // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+               if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+                       this.lineDelimiterIsLinebreak = true;
+               }
+
+               this.commentCount = 0;
+               this.invalidLineCount = 0;
+       }
+
+       @Override
+       public OUT nextRecord(OUT record) throws IOException {
+               OUT returnRecord = null;
+               do {
+                       returnRecord = super.nextRecord(record);
+               } while (returnRecord == null && !reachedEnd());
+
+               return returnRecord;
        }
 
        @Override
-       protected OUT createTuple(OUT reuse) {
-               Tuple result = (Tuple) reuse;
-               for (int i = 0; i < parsedValues.length; i++) {
-                       result.setField(parsedValues[i], i);
+       public OUT readRecord(OUT reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+               /*
+                * Fix to support windows line endings in CSVInputFiles with 
standard delimiter setup = \n
+                */
+               //Find windows end line, so find carriage return before the 
newline
+               if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && 
bytes[offset + numBytes - 1] == '\r') {
+                       //reduce the number of bytes so that the Carriage 
return is not taken as data
+                       numBytes--;
+               }
+
+               if (commentPrefix != null && commentPrefix.length <= numBytes) {
+                       //check record for comments
+                       boolean isComment = true;
+                       for (int i = 0; i < commentPrefix.length; i++) {
+                               if (commentPrefix[i] != bytes[offset + i]) {
+                                       isComment = false;
+                                       break;
+                               }
+                       }
+                       if (isComment) {
+                               this.commentCount++;
+                               return null;
+                       }
+               }
+
+               if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+                       return fillRecord(reuse, parsedValues);
+               } else {
+                       this.invalidLineCount++;
+                       return null;
+               }
+       }
+
+       protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
+
+       public Class<?>[] getFieldTypes() {
+               return super.getGenericFieldTypes();
+       }
+
+       protected static boolean[] createDefaultMask(int size) {
+               boolean[] includedMask = new boolean[size];
+               for (int x=0; x<includedMask.length; x++) {
+                       includedMask[x] = true;
+               }
+               return includedMask;
+       }
+
+       protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
+               Preconditions.checkNotNull(sourceFieldIndices);
+
+               for (int i : sourceFieldIndices) {
+                       if (i < 0) {
+                               throw new IllegalArgumentException("Field 
indices must not be smaller than zero.");
+                       }
+               }
+
+               boolean[] includedMask = new 
boolean[Ints.max(sourceFieldIndices) + 1];
+
+               // check if we support parsers for these types
+               for (int i = 0; i < sourceFieldIndices.length; i++) {
+                       includedMask[sourceFieldIndices[i]] = true;
                }
 
-               return reuse;
+               return includedMask;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 34b5b47..1bb2eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -292,19 +292,10 @@ public class CsvReader {
 
                @SuppressWarnings("unchecked")
                PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) 
TypeExtractor.createTypeInfo(pojoType);
-               CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, 
typeInfo);
 
-               Class<?>[] classes = new Class<?>[pojoFields.length];
-               for (int i = 0; i < pojoFields.length; i++) {
-                       int pos = typeInfo.getFieldIndex(pojoFields[i]);
-                       if(pos < 0) {
-                               throw new IllegalArgumentException("Field 
\""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
-                       }
-                       classes[i] = 
typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass();
-               }
+               CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, 
this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, 
this.includedMask);
 
-               configureInputFormat(inputFormat, classes);
-               inputFormat.setOrderOfPOJOFields(pojoFields);
+               configureInputFormat(inputFormat);
 
                return new DataSource<T>(executionContext, inputFormat, 
typeInfo, Utils.getCallLocationName());
        }
@@ -325,14 +316,14 @@ public class CsvReader {
                
                @SuppressWarnings("unchecked")
                TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) 
TypeExtractor.createTypeInfo(targetType);
-               CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, 
typeInfo);
+               CsvInputFormat<T> inputFormat = new 
TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, 
this.includedMask);
                
                Class<?>[] classes = new Class<?>[typeInfo.getArity()];
                for (int i = 0; i < typeInfo.getArity(); i++) {
                        classes[i] = typeInfo.getTypeAt(i).getTypeClass();
                }
                
-               configureInputFormat(inputFormat, classes);
+               configureInputFormat(inputFormat);
                return new DataSource<T>(executionContext, inputFormat, 
typeInfo, Utils.getCallLocationName());
        }
        
@@ -340,7 +331,7 @@ public class CsvReader {
        // Miscellaneous
        // 
--------------------------------------------------------------------------------------------
        
-       private void configureInputFormat(CsvInputFormat<?> format, Class<?>... 
types) {
+       private void configureInputFormat(CsvInputFormat<?> format) {
                format.setDelimiter(this.lineDelimiter);
                format.setFieldDelimiter(this.fieldDelimiter);
                format.setCommentPrefix(this.commentPrefix);
@@ -349,12 +340,6 @@ public class CsvReader {
                if (this.parseQuotedStrings) {
                        format.enableQuotedStringParsing(this.quoteCharacter);
                }
-
-               if (this.includedMask == null) {
-                       format.setFieldTypes(types);
-               } else {
-                       format.setFields(this.includedMask, types);
-               }
        }
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -374,8 +359,8 @@ public class CsvReader {
         */
        public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
                TupleTypeInfo<Tuple1<T0>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0);
-               CsvInputFormat<Tuple1<T0>> inputFormat = new 
CsvInputFormat<Tuple1<T0>>(path, types);
-               configureInputFormat(inputFormat, type0);
+               CsvInputFormat<Tuple1<T0>> inputFormat = new 
TupleCsvInputFormat<Tuple1<T0>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple1<T0>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -391,8 +376,8 @@ public class CsvReader {
         */
        public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, 
Class<T1> type1) {
                TupleTypeInfo<Tuple2<T0, T1>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1);
-               CsvInputFormat<Tuple2<T0, T1>> inputFormat = new 
CsvInputFormat<Tuple2<T0, T1>>(path, types);
-               configureInputFormat(inputFormat, type0, type1);
+               CsvInputFormat<Tuple2<T0, T1>> inputFormat = new 
TupleCsvInputFormat<Tuple2<T0, T1>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple2<T0, T1>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -409,8 +394,8 @@ public class CsvReader {
         */
        public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> 
type0, Class<T1> type1, Class<T2> type2) {
                TupleTypeInfo<Tuple3<T0, T1, T2>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2);
-               CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new 
CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2);
+               CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new 
TupleCsvInputFormat<Tuple3<T0, T1, T2>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple3<T0, T1, T2>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -428,8 +413,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
                TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3);
-               CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new 
CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3);
+               CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new 
TupleCsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -448,8 +433,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4) {
                TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4);
-               CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new 
CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4);
+               CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new 
TupleCsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple5<T0, T1, T2, T3, 
T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -469,8 +454,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, 
T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5) {
                TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5);
-               CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = 
new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5);
+               CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = 
new TupleCsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple6<T0, T1, T2, T3, T4, 
T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -491,8 +476,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, 
T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> 
type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
                TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6);
-               CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat 
= new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6);
+               CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat 
= new TupleCsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, 
T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -514,8 +499,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, 
T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, 
Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> 
type7) {
                TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7);
-               CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> 
inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, 
types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7);
+               CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> 
inputFormat = new TupleCsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, 
T7>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, 
T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -538,8 +523,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, 
T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> 
type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, 
Class<T7> type7, Class<T8> type8) {
                TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types 
= TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8);
-               CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> 
inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, 
T8>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8);
+               CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> 
inputFormat = new TupleCsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, 
T8>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, 
T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -563,8 +548,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, 
T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, 
Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> 
type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
                TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> 
types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, 
type3, type4, type5, type6, type7, type8, type9);
-               CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> 
inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9);
+               CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> 
inputFormat = new TupleCsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -589,8 +574,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> 
DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10) {
                TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, 
type2, type3, type4, type5, type6, type7, type8, type9, type10);
-               CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10);
+               CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10>> inputFormat = new TupleCsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, 
T7, T8, T9, T10>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -616,8 +601,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> 
DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
                TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, 
type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
-               CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, 
T7, T8, T9, T10, T11>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11);
+               CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11>> inputFormat = new TupleCsvInputFormat<Tuple12<T0, T1, T2, T3, T4, 
T5, T6, T7, T8, T9, T10, T11>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -644,8 +629,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> 
DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> 
type12) {
                TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, 
type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, 
type12);
-               CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, 
T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12);
+               CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12>> inputFormat = new TupleCsvInputFormat<Tuple13<T0, T1, T2, T3, 
T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -673,8 +658,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> 
DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> 
type12, Class<T13> type13) {
                TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
-               CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, 
T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
+               CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13>> inputFormat = new TupleCsvInputFormat<Tuple14<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -703,8 +688,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> 
type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, 
Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, 
Class<T12> type12, Class<T13> type13, Class<T14> type14) {
                TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14);
-               CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14);
+               CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14>> inputFormat = new TupleCsvInputFormat<Tuple15<T0, T1, 
T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -734,8 +719,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, 
Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> 
type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, 
Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
                TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15);
-               CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, 
T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15);
+               CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15>> inputFormat = new 
TupleCsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, 
Utils.getCallLocationName());
        }
 
@@ -766,8 +751,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, 
T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, 
Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> 
type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, 
Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, 
Class<T15> type15, Class<T16> type16) {
                TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16);
-               CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16>> inputFormat = new 
CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16);
+               CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16>> inputFormat = new 
TupleCsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, 
types, Utils.getCallLocationName());
        }
 
@@ -799,8 +784,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> 
type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, 
Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> 
type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> 
type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
                TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17);
-               CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new 
CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17);
+               CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new 
TupleCsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, 
types, Utils.getCallLocationName());
        }
 
@@ -833,8 +818,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, 
Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> 
type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, 
Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, 
Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, 
Class<T18> type18) {
                TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18);
-               CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new 
CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18);
+               CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new 
TupleCsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -868,8 +853,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, 
T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> 
type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, 
Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> 
type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> 
type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> 
type17, Class<T18> type18, Class<T19> type19) {
                TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19);
-               CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new 
CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19);
+               CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new 
TupleCsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18, T19>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, 
inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -904,8 +889,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, 
T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> 
types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> 
type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> 
type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> 
type20) {
                TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20);
-               CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new 
CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20);
+               CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new 
TupleCsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, 
T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -941,8 +926,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, 
T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, 
T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, 
Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> 
type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> 
type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> 
type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> 
type20, Class<T21> type21) {
                TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21);
-               CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new 
CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21);
+               CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new 
TupleCsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, 
T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -979,8 +964,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, 
T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, 
T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> 
type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, 
Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, 
Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, 
Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, 
Class<T20> type20, Class<T21> type21, Class<T22> type22) {
                TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21, type22);
-               CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = 
new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21, type22);
+               CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = 
new TupleCsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, 
T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types, 
this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, 
T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -1018,8 +1003,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, 
T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, 
T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> 
type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, 
Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, 
Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, 
Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, 
Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, 
Class<T23> type23) {
                TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = 
TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
-               CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> 
inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, 
T23>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
+               CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> 
inputFormat = new TupleCsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, 
T23>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, 
T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 
@@ -1058,8 +1043,8 @@ public class CsvReader {
         */
        public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, 
T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, 
T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, 
Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> 
type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, 
Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, 
Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, 
Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, 
Class<T23> type23, Class<T24> type24) {
                TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> 
types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, 
type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, 
type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, 
type23, type24);
-               CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> 
inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, 
T24>>(path, types);
-               configureInputFormat(inputFormat, type0, type1, type2, type3, 
type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, 
type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, 
type24);
+               CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> 
inputFormat = new TupleCsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, 
T24>>(path, types, this.includedMask);
+               configureInputFormat(inputFormat);
                return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, 
T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
new file mode 100644
index 0000000..2f1139c
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Class<OUT> pojoTypeClass;
+
+       private String[] pojoFieldNames;
+
+       private transient PojoTypeInfo<OUT> pojoTypeInfo;
+       private transient Field[] pojoFields;
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo);
+       }
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo, String[] fieldNames) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo) {
+               this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+               this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, createDefaultMask(fieldNames.length));
+       }
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo, int[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+       }
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+               this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] 
includedFieldsMask) {
+               super(filePath);
+               boolean[] mask = (includedFieldsMask == null)
+                               ? createDefaultMask(fieldNames.length)
+                               : toBooleanMask(includedFieldsMask);
+               configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, mask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo, boolean[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> 
pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+               this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+       }
+
+       public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] 
includedFieldsMask) {
+               super(filePath);
+               configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, includedFieldsMask);
+       }
+
+       private void configure(String lineDelimiter, String fieldDelimiter, 
PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] 
includedFieldsMask) {
+
+               if (includedFieldsMask == null) {
+                       includedFieldsMask = 
createDefaultMask(fieldNames.length);
+               }
+
+               for (String name : fieldNames) {
+                       if (name == null) {
+                               throw new NullPointerException("Field name must 
not be null.");
+                       }
+                       if (pojoTypeInfo.getFieldIndex(name) < 0) {
+                               throw new IllegalArgumentException("Field \"" + 
name + "\" not part of POJO type " + 
pojoTypeInfo.getTypeClass().getCanonicalName());
+                       }
+               }
+
+               setDelimiter(lineDelimiter);
+               setFieldDelimiter(fieldDelimiter);
+
+               Class<?>[] classes = new Class<?>[fieldNames.length];
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       try {
+                               classes[i] = 
pojoTypeInfo.getTypeAt(pojoTypeInfo.getFieldIndex(fieldNames[i])).getTypeClass();
+                       } catch (IndexOutOfBoundsException e) {
+                               throw new IllegalArgumentException("Invalid 
field name: " + fieldNames[i]);
+                       }
+               }
+
+               this.pojoTypeClass = pojoTypeInfo.getTypeClass();
+               this.pojoTypeInfo = pojoTypeInfo;
+               setFieldsGeneric(includedFieldsMask, classes);
+               setOrderOfPOJOFields(fieldNames);
+       }
+
+       private void setOrderOfPOJOFields(String[] fieldNames) {
+               Preconditions.checkNotNull(fieldNames);
+
+               int includedCount = 0;
+               for (boolean isIncluded : fieldIncluded) {
+                       if (isIncluded) {
+                               includedCount++;
+                       }
+               }
+
+               Preconditions.checkArgument(includedCount == fieldNames.length, 
includedCount
+                               + " CSV fields and " + fieldNames.length + " 
POJO fields selected. The number of selected CSV and POJO fields must be 
equal.");
+
+               for (String field : fieldNames) {
+                       Preconditions.checkNotNull(field, "The field name 
cannot be null.");
+                       
Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+                                       "Field \"" + field + "\" is not a 
member of POJO class " + pojoTypeClass.getName());
+               }
+
+               pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, 
fieldNames.length);
+       }
+
+       @Override
+       public void open(FileInputSplit split) throws IOException {
+               super.open(split);
+
+               pojoFields = new Field[pojoFieldNames.length];
+
+               Map<String, Field> allFields = new HashMap<String, Field>();
+
+               findAllFields(pojoTypeClass, allFields);
+
+               for (int i = 0; i < pojoFieldNames.length; i++) {
+                       pojoFields[i] = allFields.get(pojoFieldNames[i]);
+
+                       if (pojoFields[i] != null) {
+                               pojoFields[i].setAccessible(true);
+                       } else {
+                               throw new RuntimeException("There is no field 
called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
+                       }
+               }
+       }
+
+       /**
+        * Finds all declared fields in a class and all its super classes.
+        *
+        * @param clazz Class for which all declared fields are found
+        * @param allFields Map containing all found fields so far
+        */
+       private void findAllFields(Class<?> clazz, Map<String, Field> 
allFields) {
+               for (Field field : clazz.getDeclaredFields()) {
+                       allFields.put(field.getName(), field);
+               }
+
+               if (clazz.getSuperclass() != null) {
+                       findAllFields(clazz.getSuperclass(), allFields);
+               }
+       }
+
+       @Override
+       public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+               for (int i = 0; i < parsedValues.length; i++) {
+                       try {
+                               pojoFields[i].set(reuse, parsedValues[i]);
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Parsed value could 
not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
+                       }
+               }
+               return reuse;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
new file mode 100644
index 0000000..82caddd
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+
+public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private TupleSerializerBase<OUT> tupleSerializer;
+
+       public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> 
tupleTypeInfo) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
tupleTypeInfo);
+       }
+
+       public TupleCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
+               this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, 
createDefaultMask(tupleTypeInfo.getArity()));
+       }
+
+       public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> 
tupleTypeInfo, int[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
tupleTypeInfo, includedFieldsMask);
+       }
+
+       public TupleCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) 
{
+               super(filePath);
+               boolean[] mask = (includedFieldsMask == null)
+                               ? createDefaultMask(tupleTypeInfo.getArity())
+                               : toBooleanMask(includedFieldsMask);
+               configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
+       }
+
+       public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> 
tupleTypeInfo, boolean[] includedFieldsMask) {
+               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
tupleTypeInfo, includedFieldsMask);
+       }
+
+       public TupleCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] 
includedFieldsMask) {
+               super(filePath);
+               configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, 
includedFieldsMask);
+       }
+       
+       private void configure(String lineDelimiter, String fieldDelimiter,
+                       TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] 
includedFieldsMask) {
+
+               if (tupleTypeInfo.getArity() == 0) {
+                       throw new IllegalArgumentException("Tuple size must be 
greater than 0.");
+               }
+
+               if (includedFieldsMask == null) {
+                       includedFieldsMask = 
createDefaultMask(tupleTypeInfo.getArity());
+               }
+
+               tupleSerializer = (TupleSerializerBase<OUT>) 
tupleTypeInfo.createSerializer(new ExecutionConfig());
+
+               setDelimiter(lineDelimiter);
+               setFieldDelimiter(fieldDelimiter);
+
+               Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
+
+               for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
+                       classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
+               }
+
+               setFieldsGeneric(includedFieldsMask, classes);
+       }
+
+       @Override
+       public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+               return tupleSerializer.createOrReuseInstance(parsedValues, 
reuse);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index a2d37ce..70b9393 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -469,19 +469,12 @@ class TupleGenerator {
                        // create csv input format
                        sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<");
                        appendTupleTypeGenerics(sb, numFields);
-                       sb.append(">> inputFormat = new CsvInputFormat<Tuple" + 
numFields + "<");
+                       sb.append(">> inputFormat = new 
TupleCsvInputFormat<Tuple" + numFields + "<");
                        appendTupleTypeGenerics(sb, numFields);
-                       sb.append(">>(path, types);\n");
+                       sb.append(">>(path, types, this.includedMask);\n");
 
                        // configure input format
-                       sb.append("\t\tconfigureInputFormat(inputFormat, ");
-                       for (int i = 0; i < numFields; i++) {
-                               if (i > 0) {
-                                       sb.append(", ");
-                               }
-                               sb.append("type" + i);
-                       }
-                       sb.append(");\n");
+                       sb.append("\t\tconfigureInputFormat(inputFormat);\n");
 
                        // return
                        sb.append("\t\treturn new DataSource<Tuple" + numFields 
+ "<");

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 46e3990..0897063 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -89,6 +89,14 @@ public class TupleSerializer<T extends Tuple> extends 
TupleSerializerBase<T> {
        }
 
        @Override
+       public T createOrReuseInstance(Object[] fields, T reuse) {
+               for (int i = 0; i < arity; i++) {
+                       reuse.setField(fields[i], i);
+               }
+               return reuse;
+       }
+
+       @Override
        public T copy(T from) {
                T target = instantiateRaw();
                for (int i = 0; i < arity; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index bf3c7a1..fc657a1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -67,6 +67,8 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
        // of immutable Typles (i.e. Scala Tuples)
        public abstract T createInstance(Object[] fields);
 
+       public abstract T createOrReuseInstance(Object[] fields, T reuse);
+
        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                for (int i = 0; i < arity; i++) {

Reply via email to