docete commented on a change in pull request #11276:
[FLINK-16029][table-planner-blink] Remove register source and sink in test
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402721904
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
##########
@@ -73,121 +79,72 @@ class InMemoryLookupableTableSource(
map.toMap
}
+ override def getDataStream(execEnv: StreamExecutionEnvironment):
DataStream[Row] = {
+ null
+ }
+
override def isAsyncEnabled: Boolean = asyncEnabled
- override def getReturnType: TypeInformation[Row] = new
RowTypeInfo(fieldTypes, fieldNames)
+ override def getProducedDataType: DataType = schema.toRowDataType
+
+ override def getTableSchema: TableSchema = schema
- override def getTableSchema: TableSchema = new TableSchema(fieldNames,
fieldTypes)
+
+ override def isBounded: Boolean = bounded
@VisibleForTesting
def getResourceCounter: Int = resourceCounter.get()
-
}
-object InMemoryLookupableTableSource {
-
- /**
- * Return a new builder that builds a [[InMemoryLookupableTableSource]].
- *
- * For example:
- *
- * {{{
- * val data = (
- * (11, 1L, "Julian"),
- * (22, 2L, "Jark"),
- * (33, 3L, "Fabian"))
- *
- * val source = InMemoryLookupableTableSource.builder()
- * .data(data)
- * .field("age", Types.INT)
- * .field("id", Types.LONG)
- * .field("name", Types.STRING)
- * .enableAsync()
- * .build()
- * }}}
- *
- * @return a new builder to build a [[InMemoryLookupableTableSource]]
- */
- def builder(): Builder = new Builder
+class InMemoryLookupableTableFactory extends TableSourceFactory[Row] {
+ override def createTableSource(properties: util.Map[String, String]):
TableSource[Row] = {
+ val dp = new DescriptorProperties
+ dp.putProperties(properties)
+ val tableSchema = dp.getTableSchema(SCHEMA)
+ val serializedData = dp.getString("data")
+ val data = EncodingUtils.decodeStringToObject(serializedData,
classOf[List[Product]])
- /**
- * A builder for creating [[InMemoryLookupableTableSource]] instances.
- *
- * For example:
- *
- * {{{
- * val data = (
- * (11, 1L, "Julian"),
- * (22, 2L, "Jark"),
- * (33, 3L, "Fabian"))
- *
- * val source = InMemoryLookupableTableSource.builder()
- * .data(data)
- * .field("age", Types.INT)
- * .field("id", Types.LONG)
- * .field("name", Types.STRING)
- * .enableAsync()
- * .build()
- * }}}
- */
- class Builder {
- private val schema = new mutable.LinkedHashMap[String,
TypeInformation[_]]()
- private var data: List[Product] = _
- private var asyncEnabled: Boolean = false
-
- /**
- * Sets table data for the table source.
- */
- def data(data: List[Product]): Builder = {
- this.data = data
- this
+ val rowData = data.map { entry =>
+ Row.of((0 until
entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
}
- /**
- * Adds a field with the field name and the type information. Required.
- * This method can be called multiple times. The call order of this
method defines
- * also the order of the fields in a row.
- *
- * @param fieldName the field name
- * @param fieldType the type information of the field
- */
- def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
- if (schema.contains(fieldName)) {
- throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
- }
- schema += (fieldName -> fieldType)
- this
- }
+ val asyncEnabled = dp.getOptionalBoolean("is-async").orElse(false)
- /**
- * Enables async lookup for the table source
- */
- def enableAsync(): Builder = {
- asyncEnabled = true
- this
- }
+ val bounded = dp.getOptionalBoolean("is-bounded").orElse(false)
- /**
- * Apply the current values and constructs a newly-created
[[InMemoryLookupableTableSource]].
- *
- * @return a newly-created [[InMemoryLookupableTableSource]].
- */
- def build(): InMemoryLookupableTableSource = {
- val fieldNames = schema.keys.toArray
- val fieldTypes = schema.values.toArray
- Preconditions.checkNotNull(data)
- // convert
- val rowData = data.map { entry =>
- Row.of((0 until
entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
- }
- new InMemoryLookupableTableSource(
- fieldNames,
- fieldTypes,
- rowData,
- asyncEnabled
- )
- }
+ new InMemoryLookupableTableSource(tableSchema, rowData, asyncEnabled,
bounded)
+ }
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, "InMemoryLookupableTable")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val supported = new util.ArrayList[String]()
+ supported.add("*")
+ supported
+ }
+}
+
+object InMemoryLookupableTableSource {
+
+ def createTemporaryTable(
Review comment:
Using `createTemporaryTable` doesn't mean that it will return something. See
`TableEnvironment.createTemporaryView` or
`ConnectorTableDescriptor.createTemporaryTable`. I think we should use an unify
naming.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services