[
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998985#comment-14998985
]
ASF GitHub Bot commented on FLINK-2692:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1266#discussion_r44438028
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Class<OUT> pojoTypeClass;
+
+ private String[] pojoFieldNames;
+
+ private transient PojoTypeInfo<OUT> pojoTypeInfo;
+ private transient Field[] pojoFields;
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo, String[] fieldNames) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo,
pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo,
fieldNames, createDefaultMask(fieldNames.length));
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo, int[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo, fieldNames, includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo,
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[]
includedFieldsMask) {
+ super(filePath);
+ boolean[] mask = (includedFieldsMask == null)
+ ? createDefaultMask(fieldNames.length)
+ : toBooleanMask(includedFieldsMask);
+ configure(lineDelimiter, fieldDelimiter, pojoTypeInfo,
fieldNames, mask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT>
pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
pojoTypeInfo, fieldNames, includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo,
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[]
includedFieldsMask) {
+ super(filePath);
+ configure(lineDelimiter, fieldDelimiter, pojoTypeInfo,
fieldNames, includedFieldsMask);
+ }
+
+ private void configure(String lineDelimiter, String fieldDelimiter,
PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[]
includedFieldsMask) {
+
+ if (includedFieldsMask == null) {
+ includedFieldsMask = new boolean[fieldNames.length];
--- End diff --
use `createDefaultMask()`?
> Untangle CsvInputFormat into PojoTypeCsvInputFormat and
> TupleTypeCsvInputFormat
> --------------------------------------------------------------------------------
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Chesnay Schepler
> Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a
> {{Pojo}} type. As a consequence, the processing logic, which has to work for
> both types, is overly complex. For example, the {{CsvInputFormat}} contains
> fields which are only used when a Pojo is returned. Moreover, the pojo field
> information are constructed by calling setter methods which have to be called
> in a very specific order, otherwise they fail. E.g. one first has to call
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the
> number of fields might be different. Furthermore, some of the methods can
> only be called if the return type is a {{Pojo}} type, because they expect
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more
> easily maintainable. I propose to split it up into a
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all
> the required information via their constructors instead of using the
> {{setFields}} and {{setOrderOfPOJOFields}} approach.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)