Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/3273#discussion_r100455645
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
---
@@ -138,4 +141,160 @@ class CsvTableSource(
inputFormat
}
+
+ override def equals(other: Any): Boolean = other match {
+ case that: CsvTableSource => returnType == that.returnType &&
+ path == that.path &&
+ fieldDelim == that.fieldDelim &&
+ rowDelim == that.rowDelim &&
+ quoteCharacter == that.quoteCharacter &&
+ ignoreFirstLine == that.ignoreFirstLine &&
+ ignoreComments == that.ignoreComments &&
+ lenient == that.lenient
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(returnType)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
+
+object CsvTableSource {
+
+ /**
+ * A builder for creating [[CsvTableSource]] instances.
+ *
+ * For example:
+ *
+ * {{{
+ * val source: CsvTableSource = new CsvTableSourceBuilder()
+ * .path("/path/to/your/file.csv")
+ * .field("myfield", Types.STRING)
+ * .field("myfield2", Types.INT)
+ * .build()
+ * }}}
+ *
+ */
+ class Builder {
+
+ private val fieldNames: ListBuffer[String] = ListBuffer[String]()
+ private val fieldTypes: ListBuffer[TypeInformation[_]] =
ListBuffer[TypeInformation[_]]()
+ private var quoteCharacter: Character = _
+ private var path: String = _
+ private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
+ private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
+ private var isIgnoreFirstLine: Boolean = false
+ private var commentPrefix: String = _
+ private var lenient: Boolean = false
+
+
+ /**
+ * Sets the path to the CSV file.
+ * @param path the path to the CSV file
+ */
+ def path(path: String): Builder = {
+ this.path = path
+ this
+ }
+
+ /**
+ * Sets the field delimiter, "," by default.
+ * @param delim the field delimiter
+ */
+ def fieldDelimiter(delim: String): Builder = {
+ this.fieldDelim = delim
+ this
+ }
+
+ /**
+ * Sets the line delimiter, "\n" by default.
+ * @param delim the line delimiter
+ */
+ def lineDelimiter(delim: String): Builder = {
+ this.lineDelim = delim
+ this
+ }
+
+ /**
+ * Add a field with the field name and the type information.
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ def field(fieldName: String, fieldType: TypeInformation[_]): Builder =
{
+ this.fieldNames += fieldName
+ this.fieldTypes += fieldType
+ this
+ }
+
+ /**
+ * Sets a quote character for String values, null by default.
+ * @param quote the quote character
+ */
+ def quoteCharacter(quote: Character): Builder = {
+ this.quoteCharacter = quote
+ this
+ }
+
+ /**
+ * Sets a prefix to indicate comments, null by default.
+ * @param prefix the prefix to indicate comments
+ */
+ def commentPrefix(prefix: String): Builder = {
+ this.commentPrefix = prefix
+ this
+ }
+
+ /**
+ * Ignore the first line. Not skip the first line by default.
+ */
+ def ignoreFirstLine: Builder = {
+ this.isIgnoreFirstLine = true
+ this
+ }
+
+ /**
+ * Skip records with parse error instead to fail. Throw an exception
by default.
+ */
+ def ignoreParseErrors: Builder = {
+ this.lenient = true
+ this
+ }
+
+ /**
+ * Apply the current values and constructs a newly-created
[[CsvTableSource]].
+ * @return a newly-created [[CsvTableSource]].
+ */
+ def build: CsvTableSource = {
+ Preconditions.checkNotNull(path, "Path must not be null.")
--- End diff --
Fields can not be empty too?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---