wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299287964
 
 

 ##########
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##########
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource<Row> implements
+       LookupableTableSource<Row>, ProjectableTableSource<Row>, 
BatchTableSource<Row> {
+
+       private final CsvInputFormatConfig config;
+
+
+       /**
+        * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+        * (logically) unlimited number of fields.
+        *
+        * @param path The path to the CSV file.
+        * @param fieldNames The names of the table fields.
+        * @param fieldTypes The types of the table fields.
+        */
+       public CsvTableSource(String path, String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+                       null, false, null, false);
+       }
+
+       /**
+        * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+        * (logically) unlimited number of fields.
+        *
+        * @param path The path to the CSV file.
+        * @param fieldNames The names of the table fields.
+        * @param fieldTypes The types of the table fields.
+        * @param fieldDelim The field delimiter, "," by default.
+        * @param lineDelim The row delimiter, "\n" by default.
+        * @param quoteCharacter An optional quote character for String values, 
null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments An optional prefix to indicate comments, null 
by default.
+        * @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       fieldDelim, lineDelim,
+                       quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+       }
+
+       /**
+        * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+        * (logically) unlimited number of fields.
+        *
+        * @param path The path to the CSV file.
+        * @param fieldNames The names of the table fields.
+        * @param fieldTypes The types of the table fields.
+        * @param selectedFields The fields which will be read and returned by 
the table source.
+        *                       If None, all fields are returned.
+        * @param fieldDelim The field delimiter, "," by default.
+        * @param lineDelim The row delimiter, "\n" by default.
+        * @param quoteCharacter An optional quote character for String values, 
null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments An optional prefix to indicate comments, null 
by default.
+        * @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               int[] selectedFields,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+               this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, 
selectedFields,
+                       fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, 
ignoreComments, lenient));
+       }
+
+       private CsvTableSource(CsvInputFormatConfig config) {
+               this.config = config;
+       }
+
+       /**
+        * Return a new builder that builds a [[CsvTableSource]].
+        * For example:
+        * {{{
+        *   val source: CsvTableSource = CsvTableSource
+        *     .builder()
+        *     .path("/path/to/your/file.csv")
+        *     .field("myfield", Types.STRING)
+        *     .field("myfield2", Types.INT)
+        *     .build()
+        * }}}
+        * @return a new builder to build a [[CsvTableSource]]
+        */
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       @Override
+       public TypeInformation<Row> getReturnType() {
+               return new RowTypeInfo(config.getSelectedFieldTypes(), 
config.getSelectedFieldNames());
+       }
+
+       @Override
+       public InputFormat<Row, ?> getInputFormat() {
+               return config.createInputFormat();
+       }
+
+       @Override
+       public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+               return new CsvLookupFunction(config, lookupKeys);
+       }
+
+       @Override
+       public AsyncTableFunction<Row> getAsyncLookupFunction(String[] 
lookupKeys) {
+               throw new UnsupportedOperationException("CSV do not support 
async lookup");
+       }
+
+       @Override
+       public boolean isAsyncEnabled() {
+               return false;
+       }
+
+       @Override
+       public TableSchema getTableSchema() {
+               return new TableSchema(config.fieldNames, config.fieldTypes);
+       }
+
+       @Override
+       public CsvTableSource projectFields(int[] fields) {
+               if (fields.length == 0) {
+                       fields = new int[]{0};
+               }
+               return new CsvTableSource(config.select(fields));
+       }
+
+       @Override
+       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+               return execEnv.createInput(config.createInputFormat(),
+                       new RowTypeInfo(config.getSelectedFieldTypes(), 
config.getSelectedFieldNames()))
+                       .name(explainSource());
+       }
+
+       @Override
+       public String explainSource() {
+               String[] fields = config.getSelectedFieldNames();
+               StringBuilder builder = new StringBuilder();
+               builder.append("CsvTableSource(read fields: ");
+               boolean first = true;
+               for (String f : fields) {
+                       if (!first) {
+                               builder.append(", ");
+                       }
+                       builder.append(f);
+                       first = false;
+               }
 
 Review comment:
   This can be replaced by `org.apache.commons.lang3.StringUtils.join(fields, 
", ");` or `Arrays.stream(fields).collect(Collectors.joining(", "));`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to