[FLINK-2692] Untangle CsvInputFormat This closes #1266
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd61f2db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd61f2db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd61f2db Branch: refs/heads/master Commit: bd61f2dbdf1a0215363ffa8416329e1dbf277593 Parents: fc6fec7 Author: zentol <ches...@apache.org> Authored: Sun Oct 18 20:23:23 2015 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Nov 18 21:42:00 2015 +0100 ---------------------------------------------------------------------- .../wordcount/BoltTokenizerWordCountPojo.java | 3 +- .../BoltTokenizerWordCountWithNames.java | 3 +- .../flink/api/java/io/CommonCsvInputFormat.java | 258 ------------------- .../flink/api/java/io/CsvInputFormat.java | 127 ++++++++- .../org/apache/flink/api/java/io/CsvReader.java | 125 ++++----- .../flink/api/java/io/PojoCsvInputFormat.java | 199 ++++++++++++++ .../flink/api/java/io/TupleCsvInputFormat.java | 90 +++++++ .../flink/api/java/tuple/TupleGenerator.java | 13 +- .../java/typeutils/runtime/TupleSerializer.java | 8 + .../typeutils/runtime/TupleSerializerBase.java | 2 + .../flink/api/java/io/CsvInputFormatTest.java | 96 ++----- .../flink/python/api/PythonPlanBinder.java | 13 +- .../optimizer/ReplicatingDataSourceTest.java | 26 +- .../scala/operators/ScalaCsvInputFormat.java | 65 ----- .../scala/operators/ScalaCsvOutputFormat.java | 6 +- .../flink/api/scala/ExecutionEnvironment.scala | 56 ++-- .../scala/typeutils/CaseClassSerializer.scala | 4 + .../flink/api/scala/io/CsvInputFormatTest.scala | 78 ++---- 18 files changed, 573 insertions(+), 599 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java index 9bdcead..e093714 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java @@ -20,6 +20,7 @@ package org.apache.flink.storm.wordcount; import backtype.storm.topology.IRichBolt; import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.PojoCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -119,7 +120,7 @@ public class BoltTokenizerWordCountPojo { // read the text file from given input path PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor .getForObject(new Sentence("")); - return env.createInput(new CsvInputFormat<Sentence>(new Path( + return env.createInput(new PojoCsvInputFormat<Sentence>(new Path( textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), sourceType); http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java index 019f1bc..f5a1a35 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java @@ -21,6 +21,7 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.tuple.Fields; import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -122,7 +123,7 @@ public class BoltTokenizerWordCountWithNames { // read the text file from given input path TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor .getForObject(new Tuple1<String>("")); - return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path( + return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path( textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), sourceType); http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java deleted file mode 100644 index 444d151..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.io.GenericCsvInputFormat; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.parser.FieldParser; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public abstract class CommonCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> { - - private static final long serialVersionUID = 1L; - - public static final String DEFAULT_LINE_DELIMITER = "\n"; - - public static final String DEFAULT_FIELD_DELIMITER = ","; - - protected transient Object[] parsedValues; - - private final Class<OUT> pojoTypeClass; - - private String[] pojoFieldNames; - - private transient PojoTypeInfo<OUT> pojoTypeInfo; - private transient Field[] pojoFields; - - public CommonCsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); - } - - public CommonCsvInputFormat( - Path filePath, - String lineDelimiter, - String fieldDelimiter, - CompositeType<OUT> compositeTypeInfo) { - super(filePath); - - setDelimiter(lineDelimiter); - setFieldDelimiter(fieldDelimiter); - - Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()]; - - for (int i = 0; i < compositeTypeInfo.getArity(); i++) { - classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass(); - } - - setFieldTypes(classes); - - if (compositeTypeInfo instanceof PojoTypeInfo) { - pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo; - - pojoTypeClass = compositeTypeInfo.getTypeClass(); - setOrderOfPOJOFields(compositeTypeInfo.getFieldNames()); - } else { - pojoTypeClass = null; - pojoFieldNames = null; - } - } - - public void setOrderOfPOJOFields(String[] fieldNames) { - Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO."); - Preconditions.checkNotNull(fieldNames); - - int includedCount = 0; - for (boolean isIncluded : fieldIncluded) { - if (isIncluded) { - includedCount++; - } - } - - Preconditions.checkArgument(includedCount == fieldNames.length, includedCount + - " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal."); - - for (String field : fieldNames) { - Preconditions.checkNotNull(field, "The field name cannot be null."); - Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1, - "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName()); - } - - pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length); - } - - public void setFieldTypes(Class<?>... fieldTypes) { - if (fieldTypes == null || fieldTypes.length == 0) { - throw new IllegalArgumentException("Field types must not be null or empty."); - } - - setFieldTypesGeneric(fieldTypes); - } - - public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) { - Preconditions.checkNotNull(sourceFieldIndices); - Preconditions.checkNotNull(fieldTypes); - - checkForMonotonousOrder(sourceFieldIndices, fieldTypes); - - setFieldsGeneric(sourceFieldIndices, fieldTypes); - } - - public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) { - Preconditions.checkNotNull(sourceFieldMask); - Preconditions.checkNotNull(fieldTypes); - - setFieldsGeneric(sourceFieldMask, fieldTypes); - } - - public Class<?>[] getFieldTypes() { - return super.getGenericFieldTypes(); - } - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - - @SuppressWarnings("unchecked") - FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers(); - - //throw exception if no field parsers are available - if (fieldParsers.length == 0) { - throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); - } - - // create the value holders - this.parsedValues = new Object[fieldParsers.length]; - for (int i = 0; i < fieldParsers.length; i++) { - this.parsedValues[i] = fieldParsers[i].createValue(); - } - - // left to right evaluation makes access [0] okay - // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default - if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { - this.lineDelimiterIsLinebreak = true; - } - - // for POJO type - if (pojoTypeClass != null) { - pojoFields = new Field[pojoFieldNames.length]; - - Map<String, Field> allFields = new HashMap<String, Field>(); - - findAllFields(pojoTypeClass, allFields); - - for (int i = 0; i < pojoFieldNames.length; i++) { - pojoFields[i] = allFields.get(pojoFieldNames[i]); - - if (pojoFields[i] != null) { - pojoFields[i].setAccessible(true); - } else { - throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName()); - } - } - } - - this.commentCount = 0; - this.invalidLineCount = 0; - } - - /** - * Finds all declared fields in a class and all its super classes. - * - * @param clazz Class for which all declared fields are found - * @param allFields Map containing all found fields so far - */ - private void findAllFields(Class<?> clazz, Map<String, Field> allFields) { - for (Field field: clazz.getDeclaredFields()) { - allFields.put(field.getName(), field); - } - - if (clazz.getSuperclass() != null) { - findAllFields(clazz.getSuperclass(), allFields); - } - } - - @Override - public OUT nextRecord(OUT record) throws IOException { - OUT returnRecord = null; - do { - returnRecord = super.nextRecord(record); - } while (returnRecord == null && !reachedEnd()); - - return returnRecord; - } - - @Override - public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { - /* - * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n - */ - //Find windows end line, so find carriage return before the newline - if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) { - //reduce the number of bytes so that the Carriage return is not taken as data - numBytes--; - } - - if (commentPrefix != null && commentPrefix.length <= numBytes) { - //check record for comments - boolean isComment = true; - for (int i = 0; i < commentPrefix.length; i++) { - if (commentPrefix[i] != bytes[offset + i]) { - isComment = false; - break; - } - } - if (isComment) { - this.commentCount++; - return null; - } - } - - if (parseRecord(parsedValues, bytes, offset, numBytes)) { - if (pojoTypeClass == null) { - // result type is tuple - return createTuple(reuse); - } else { - // result type is POJO - for (int i = 0; i < parsedValues.length; i++) { - try { - pojoFields[i].set(reuse, parsedValues[i]); - } catch (IllegalAccessException e) { - throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); - } - } - return reuse; - } - - } else { - this.invalidLineCount++; - return null; - } - } - - protected abstract OUT createTuple(OUT reuse); -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index 7d86f39..8f0aa64 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -18,32 +18,133 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> { +public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT> typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; + } + + @Override + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { + /* + * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n + */ + //Find windows end line, so find carriage return before the newline + if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { + //reduce the number of bytes so that the Carriage return is not taken as data + numBytes--; + } + + if (commentPrefix != null && commentPrefix.length <= numBytes) { + //check record for comments + boolean isComment = true; + for (int i = 0; i < commentPrefix.length; i++) { + if (commentPrefix[i] != bytes[offset + i]) { + isComment = false; + break; + } + } + if (isComment) { + this.commentCount++; + return null; + } + } + + if (parseRecord(parsedValues, bytes, offset, numBytes)) { + return fillRecord(reuse, parsedValues); + } else { + this.invalidLineCount++; + return null; + } + } + + protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues); + + public Class<?>[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; x<includedMask.length; x++) { + includedMask[x] = true; + } + return includedMask; + } + + protected static boolean[] toBooleanMask(int[] sourceFieldIndices) { + Preconditions.checkNotNull(sourceFieldIndices); + + for (int i : sourceFieldIndices) { + if (i < 0) { + throw new IllegalArgumentException("Field indices must not be smaller than zero."); + } + } + + boolean[] includedMask = new boolean[Ints.max(sourceFieldIndices) + 1]; + + // check if we support parsers for these types + for (int i = 0; i < sourceFieldIndices.length; i++) { + includedMask[sourceFieldIndices[i]] = true; } - return reuse; + return includedMask; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index 34b5b47..1bb2eb9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -292,19 +292,10 @@ public class CsvReader { @SuppressWarnings("unchecked") PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType); - CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo); - Class<?>[] classes = new Class<?>[pojoFields.length]; - for (int i = 0; i < pojoFields.length; i++) { - int pos = typeInfo.getFieldIndex(pojoFields[i]); - if(pos < 0) { - throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName()); - } - classes[i] = typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass(); - } + CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask); - configureInputFormat(inputFormat, classes); - inputFormat.setOrderOfPOJOFields(pojoFields); + configureInputFormat(inputFormat); return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } @@ -325,14 +316,14 @@ public class CsvReader { @SuppressWarnings("unchecked") TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType); - CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo); + CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask); Class<?>[] classes = new Class<?>[typeInfo.getArity()]; for (int i = 0; i < typeInfo.getArity(); i++) { classes[i] = typeInfo.getTypeAt(i).getTypeClass(); } - configureInputFormat(inputFormat, classes); + configureInputFormat(inputFormat); return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } @@ -340,7 +331,7 @@ public class CsvReader { // Miscellaneous // -------------------------------------------------------------------------------------------- - private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) { + private void configureInputFormat(CsvInputFormat<?> format) { format.setDelimiter(this.lineDelimiter); format.setFieldDelimiter(this.fieldDelimiter); format.setCommentPrefix(this.commentPrefix); @@ -349,12 +340,6 @@ public class CsvReader { if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } - - if (this.includedMask == null) { - format.setFieldTypes(types); - } else { - format.setFields(this.includedMask, types); - } } // -------------------------------------------------------------------------------------------- @@ -374,8 +359,8 @@ public class CsvReader { */ public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) { TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0); - CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types); - configureInputFormat(inputFormat, type0); + CsvInputFormat<Tuple1<T0>> inputFormat = new TupleCsvInputFormat<Tuple1<T0>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -391,8 +376,8 @@ public class CsvReader { */ public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) { TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1); - CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types); - configureInputFormat(inputFormat, type0, type1); + CsvInputFormat<Tuple2<T0, T1>> inputFormat = new TupleCsvInputFormat<Tuple2<T0, T1>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -409,8 +394,8 @@ public class CsvReader { */ public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) { TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2); - CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2); + CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new TupleCsvInputFormat<Tuple3<T0, T1, T2>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -428,8 +413,8 @@ public class CsvReader { */ public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) { TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3); - CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3); + CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new TupleCsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -448,8 +433,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) { TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4); - CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4); + CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new TupleCsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -469,8 +454,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) { TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5); - CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5); + CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new TupleCsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -491,8 +476,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) { TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6); - CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6); + CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new TupleCsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -514,8 +499,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) { TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7); - CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7); + CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new TupleCsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -538,8 +523,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) { TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8); - CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8); + CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new TupleCsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -563,8 +548,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) { TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); - CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); + CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new TupleCsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -589,8 +574,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) { TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); - CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); + CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new TupleCsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -616,8 +601,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) { TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); - CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); + CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new TupleCsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -644,8 +629,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) { TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); - CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); + CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new TupleCsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -673,8 +658,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) { TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); - CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); + CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new TupleCsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -703,8 +688,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) { TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); - CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); + CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new TupleCsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -734,8 +719,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) { TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); - CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); + CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new TupleCsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -766,8 +751,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) { TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); - CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); + CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new TupleCsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -799,8 +784,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) { TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); - CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); + CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new TupleCsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -833,8 +818,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) { TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); - CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); + CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new TupleCsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -868,8 +853,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) { TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); - CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); + CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new TupleCsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -904,8 +889,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) { TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); - CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); + CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new TupleCsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -941,8 +926,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) { TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); - CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); + CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new TupleCsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -979,8 +964,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) { TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); - CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); + CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new TupleCsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -1018,8 +1003,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) { TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); - CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); + CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new TupleCsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -1058,8 +1043,8 @@ public class CsvReader { */ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) { TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); - CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); + CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new TupleCsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName()); } http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java new file mode 100644 index 0000000..2f1139c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> { + + private static final long serialVersionUID = 1L; + + private Class<OUT> pojoTypeClass; + + private String[] pojoFieldNames; + + private transient PojoTypeInfo<OUT> pojoTypeInfo; + private transient Field[] pojoFields; + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, createDefaultMask(fieldNames.length)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) + ? createDefaultMask(fieldNames.length) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, mask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(fieldNames.length); + } + + for (String name : fieldNames) { + if (name == null) { + throw new NullPointerException("Field name must not be null."); + } + if (pojoTypeInfo.getFieldIndex(name) < 0) { + throw new IllegalArgumentException("Field \"" + name + "\" not part of POJO type " + pojoTypeInfo.getTypeClass().getCanonicalName()); + } + } + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class<?>[] classes = new Class<?>[fieldNames.length]; + + for (int i = 0; i < fieldNames.length; i++) { + try { + classes[i] = pojoTypeInfo.getTypeAt(pojoTypeInfo.getFieldIndex(fieldNames[i])).getTypeClass(); + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Invalid field name: " + fieldNames[i]); + } + } + + this.pojoTypeClass = pojoTypeInfo.getTypeClass(); + this.pojoTypeInfo = pojoTypeInfo; + setFieldsGeneric(includedFieldsMask, classes); + setOrderOfPOJOFields(fieldNames); + } + + private void setOrderOfPOJOFields(String[] fieldNames) { + Preconditions.checkNotNull(fieldNames); + + int includedCount = 0; + for (boolean isIncluded : fieldIncluded) { + if (isIncluded) { + includedCount++; + } + } + + Preconditions.checkArgument(includedCount == fieldNames.length, includedCount + + " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal."); + + for (String field : fieldNames) { + Preconditions.checkNotNull(field, "The field name cannot be null."); + Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1, + "Field \"" + field + "\" is not a member of POJO class " + pojoTypeClass.getName()); + } + + pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length); + } + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + pojoFields = new Field[pojoFieldNames.length]; + + Map<String, Field> allFields = new HashMap<String, Field>(); + + findAllFields(pojoTypeClass, allFields); + + for (int i = 0; i < pojoFieldNames.length; i++) { + pojoFields[i] = allFields.get(pojoFieldNames[i]); + + if (pojoFields[i] != null) { + pojoFields[i].setAccessible(true); + } else { + throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName()); + } + } + } + + /** + * Finds all declared fields in a class and all its super classes. + * + * @param clazz Class for which all declared fields are found + * @param allFields Map containing all found fields so far + */ + private void findAllFields(Class<?> clazz, Map<String, Field> allFields) { + for (Field field : clazz.getDeclaredFields()) { + allFields.put(field.getName(), field); + } + + if (clazz.getSuperclass() != null) { + findAllFields(clazz.getSuperclass(), allFields); + } + } + + @Override + public OUT fillRecord(OUT reuse, Object[] parsedValues) { + for (int i = 0; i < parsedValues.length; i++) { + try { + pojoFields[i].set(reuse, parsedValues[i]); + } catch (IllegalAccessException e) { + throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); + } + } + return reuse; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java new file mode 100644 index 0000000..82caddd --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.core.fs.Path; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; + +public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> { + + private static final long serialVersionUID = 1L; + + private TupleSerializerBase<OUT> tupleSerializer; + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); + } + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) + ? createDefaultMask(tupleTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); + } + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { + + if (tupleTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Tuple size must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity()); + } + + tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; + + for (int i = 0; i < tupleTypeInfo.getArity(); i++) { + classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public OUT fillRecord(OUT reuse, Object[] parsedValues) { + return tupleSerializer.createOrReuseInstance(parsedValues, reuse); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index a2d37ce..70b9393 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -469,19 +469,12 @@ class TupleGenerator { // create csv input format sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<"); appendTupleTypeGenerics(sb, numFields); - sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<"); + sb.append(">> inputFormat = new TupleCsvInputFormat<Tuple" + numFields + "<"); appendTupleTypeGenerics(sb, numFields); - sb.append(">>(path, types);\n"); + sb.append(">>(path, types, this.includedMask);\n"); // configure input format - sb.append("\t\tconfigureInputFormat(inputFormat, "); - for (int i = 0; i < numFields; i++) { - if (i > 0) { - sb.append(", "); - } - sb.append("type" + i); - } - sb.append(");\n"); + sb.append("\t\tconfigureInputFormat(inputFormat);\n"); // return sb.append("\t\treturn new DataSource<Tuple" + numFields + "<"); http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index 46e3990..0897063 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -89,6 +89,14 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> { } @Override + public T createOrReuseInstance(Object[] fields, T reuse) { + for (int i = 0; i < arity; i++) { + reuse.setField(fields[i], i); + } + return reuse; + } + + @Override public T copy(T from) { T target = instantiateRaw(); for (int i = 0; i < arity; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index bf3c7a1..fc657a1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -67,6 +67,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { // of immutable Typles (i.e. Scala Tuples) public abstract T createInstance(Object[] fields); + public abstract T createOrReuseInstance(Object[] fields, T reuse); + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { for (int i = 0; i < arity; i++) {