wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299288657
########## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ########## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource<Row> implements + LookupableTableSource<Row>, ProjectableTableSource<Row>, BatchTableSource<Row> { + + private final CsvInputFormatConfig config; + + + /** + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + */ + public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + * @param fieldDelim The field delimiter, "," by default. + * @param lineDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. + * @param ignoreFirstLine Flag to ignore the first line, false by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. + * @param lenient Flag to skip records with parse error instead to fail, false by default. + */ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. + * @param fieldDelim The field delimiter, "," by default. + * @param lineDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. + * @param ignoreFirstLine Flag to ignore the first line, false by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. + * @param lenient Flag to skip records with parse error instead to fail, false by default. + */ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + int[] selectedFields, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, selectedFields, + fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient)); + } + + private CsvTableSource(CsvInputFormatConfig config) { + this.config = config; + } + + /** + * Return a new builder that builds a [[CsvTableSource]]. + * For example: + * {{{ + * val source: CsvTableSource = CsvTableSource + * .builder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * }}} + * @return a new builder to build a [[CsvTableSource]] + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public TypeInformation<Row> getReturnType() { + return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames()); + } + + @Override + public InputFormat<Row, ?> getInputFormat() { + return config.createInputFormat(); + } + + @Override + public TableFunction<Row> getLookupFunction(String[] lookupKeys) { + return new CsvLookupFunction(config, lookupKeys); + } + + @Override + public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) { + throw new UnsupportedOperationException("CSV do not support async lookup"); + } + + @Override + public boolean isAsyncEnabled() { + return false; + } + + @Override + public TableSchema getTableSchema() { + return new TableSchema(config.fieldNames, config.fieldTypes); + } + + @Override + public CsvTableSource projectFields(int[] fields) { + if (fields.length == 0) { + fields = new int[]{0}; + } + return new CsvTableSource(config.select(fields)); + } + + @Override + public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { + return execEnv.createInput(config.createInputFormat(), + new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames())) + .name(explainSource()); + } + + @Override + public String explainSource() { + String[] fields = config.getSelectedFieldNames(); + StringBuilder builder = new StringBuilder(); + builder.append("CsvTableSource(read fields: "); + boolean first = true; + for (String f : fields) { + if (!first) { + builder.append(", "); + } + builder.append(f); + first = false; + } + builder.append(")"); + return builder.toString(); + } + + /** + * A builder for creating [[CsvTableSource]] instances. + * For example: + * {{{ + * val source: CsvTableSource = new CsvTableSource.builder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * }}} + */ + public static class Builder { + private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>(); + private Character quoteCharacter; + private String path; + private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER; + private String lineDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER; + private boolean isIgnoreFirstLine = false; + private String commentPrefix; + private boolean lenient = false; + + /** + * Sets the path to the CSV file. Required. + * + * @param path the path to the CSV file + */ + public Builder path(String path) { + this.path = path; + return this; + } + + /** + * Sets the field delimiter, "," by default. + * + * @param delim the field delimiter + */ + public Builder fieldDelimiter(String delim) { + this.fieldDelim = delim; + return this; + } + + /** + * Sets the line delimiter, "\n" by default. + * + * @param delim the line delimiter + */ + public Builder lineDelimiter(String delim) { + this.lineDelim = delim; + return this; + } + + /** + * Adds a field with the field name and the type information. Required. This method can be + * called multiple times. The call order of this method defines also the order of the fields + * in a row. + * + * @param fieldName the field name + * @param fieldType the type information of the field + */ + public Builder field(String fieldName, TypeInformation<?> fieldType) { + if (schema.containsKey(fieldName)) { + throw new IllegalArgumentException("Duplicate field name " + fieldName); + } + schema.put(fieldName, fieldType); + return this; + } + + /** + * Sets a quote character for String values, null by default. + * + * @param quote the quote character + */ + public Builder quoteCharacter(Character quote) { + this.quoteCharacter = quote; + return this; + } + + /** + * Sets a prefix to indicate comments, null by default. + * + * @param prefix the prefix to indicate comments + */ + public Builder commentPrefix(String prefix) { + this.commentPrefix = prefix; + return this; + } + + /** + * Ignore the first line. Not skip the first line by default. + */ + public Builder ignoreFirstLine() { + this.isIgnoreFirstLine = true; + return this; + } + + /** + * Skip records with parse error instead to fail. Throw an exception by default. + */ + public Builder ignoreParseErrors() { + this.lenient = true; + return this; + } + + /** + * Apply the current values and constructs a newly-created [[CsvTableSource]]. + * + * @return a newly-created [[CsvTableSource]]. + */ + public CsvTableSource build() { + if (path == null) { + throw new IllegalArgumentException("Path must be defined."); + } + if (schema.isEmpty()) { + throw new IllegalArgumentException("Fields can not be empty."); + } + return new CsvTableSource( + path, + schema.keySet().toArray(new String[0]), + schema.values().toArray(new TypeInformation<?>[0]), + fieldDelim, + lineDelim, + quoteCharacter, + isIgnoreFirstLine, + commentPrefix, + lenient); + } + + } + + /** + * LookupFunction to support lookup in [[CsvTableSource]]. + */ + public static class CsvLookupFunction extends TableFunction<Row> { + private final CsvInputFormatConfig config; + + private final List<Integer> sourceKeys = new ArrayList<>(); + private final List<Integer> targetKeys = new ArrayList<>(); + private final Map<Object, List<Row>> one2manyDataMap = new HashMap<>(); + + CsvLookupFunction(CsvInputFormatConfig config, String[] lookupKeys) { + this.config = config; + + List<String> fields = Arrays.asList(config.getSelectedFieldNames()); + for (int i = 0; i < lookupKeys.length; i++) { + sourceKeys.add(i); + int targetIdx = fields.indexOf(lookupKeys[i]); + assert targetIdx != -1; + targetKeys.add(targetIdx); + } + } + + @Override + public TypeInformation<Row> getResultType() { + return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames()); + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + TypeInformation<Row> rowType = getResultType(); + TypeSerializer<Row> rowSerializer = rowType.createSerializer(new ExecutionConfig()); + + RowCsvInputFormat inputFormat = config.createInputFormat(); + FileInputSplit[] inputSplits = inputFormat.createInputSplits(1); + for (FileInputSplit split : inputSplits) { + inputFormat.open(split); + Row row = new Row(rowType.getArity()); + while (true) { + Row r = inputFormat.nextRecord(row); + if (r == null) { + break; + } else { + Object key = getTargetKey(r); + if (one2manyDataMap.containsKey(key)) { + one2manyDataMap.get(key).add(rowSerializer.copy(r)); + } else { + List<Row> rows = new ArrayList<>(); + rows.add(rowSerializer.copy(r)); + one2manyDataMap.put(key, rows); + } + } + } + inputFormat.close(); + } + } + + public void eval(Object... values) { + Object srcKey = getSourceKey(Row.of(values)); + if (one2manyDataMap.containsKey(srcKey)) { + for (Row row1 : one2manyDataMap.get(srcKey)) { + collect(row1); + } + } + } + + private Object getSourceKey(Row source) { + return getKey(source, sourceKeys); + } + + private Object getTargetKey(Row target) { + return getKey(target, targetKeys); + } + + private Object getKey(Row input, List<Integer> keys) { + if (keys.size() == 1) { + int keyIdx = keys.get(0); + if (input.getField(keyIdx) != null) { + return input.getField(keyIdx); + } + return null; + } else { + Row key = new Row(keys.size()); + for (int i = 0; i < keys.size(); i++) { + int keyIdx = keys.get(i); + Object field = null; + if (input.getField(keyIdx) != null) { + field = input.getField(keyIdx); + } + if (field == null) { + return null; + } + key.setField(i, field); + } + return key; + } + } + + @Override + public void close() throws Exception { + super.close(); + } + } + + private static class CsvInputFormatConfig implements Serializable { Review comment: missing `serialVersionUID`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
