wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299391290
 
 

 ##########
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##########
 @@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link InputFormatTableSource} and {@link LookupableTableSource} for 
simple CSV files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource<Row> implements
+       LookupableTableSource<Row>, ProjectableTableSource<Row>, 
BatchTableSource<Row> {
+
+       private final CsvInputFormatConfig config;
+
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path       The path to the CSV file.
+        * @param fieldNames The names of the table fields.
+        * @param fieldTypes The types of the table fields.
+        */
+       public CsvTableSource(String path, String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+                       null, false, null, false);
+       }
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path            The path to the CSV file.
+        * @param fieldNames      The names of the table fields.
+        * @param fieldTypes      The types of the table fields.
+        * @param fieldDelim      The field delimiter, "," by default.
+        * @param lineDelim       The row delimiter, "\n" by default.
+        * @param quoteCharacter  An optional quote character for String 
values, null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments  An optional prefix to indicate comments, null 
by default.
+        * @param lenient         Flag to skip records with parse error instead 
to fail, false by
+        *                        default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       fieldDelim, lineDelim,
+                       quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+       }
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path            The path to the CSV file.
+        * @param fieldNames      The names of the table fields.
+        * @param fieldTypes      The types of the table fields.
+        * @param selectedFields  The fields which will be read and returned by 
the table source. If
+        *                        None, all fields are returned.
+        * @param fieldDelim      The field delimiter, "," by default.
+        * @param lineDelim       The row delimiter, "\n" by default.
+        * @param quoteCharacter  An optional quote character for String 
values, null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments  An optional prefix to indicate comments, null 
by default.
+        * @param lenient         Flag to skip records with parse error instead 
to fail, false by
+        *                        default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               int[] selectedFields,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+               this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, 
selectedFields,
+                       fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, 
ignoreComments, lenient));
+       }
+
+       private CsvTableSource(CsvInputFormatConfig config) {
+               this.config = config;
+       }
+
+       /**
+        * Return a new builder that builds a CsvTableSource. For example: {{{ 
val source:
 
 Review comment:
   `{{{` is a scaladoc, please use `<pre></pre>` in javadoc, and please pay 
attention to the line break.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to