beyond1920 commented on a change in pull request #6906: [Flink-6036][table]Let
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r228072144
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
##########
@@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val
connectorDescriptor: ConnectorDesc
this
}
+ /**
+ * Specifies the partition columns for this external table.
+ */
+ def withPartitionColumnNames(
+ partitionColumnNames: java.util.LinkedHashSet[String]):
ExternalCatalogTableBuilder = {
+ require(partitionColumnNames != null && !partitionColumnNames.isEmpty)
+ this.partitionColumnNames = Some(partitionColumnNames)
+ this
+ }
+
/**
* Declares this external table as a table source and returns the
* configured [[ExternalCatalogTable]].
*
* @return External catalog table
*/
- def asTableSource(): ExternalCatalogTable = {
- new ExternalCatalogTable(
- isBatch,
- isStreaming,
- isSource = true,
- isSink = false,
- DescriptorProperties.toJavaMap(this))
- }
+ def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match {
+ case Some(pc) =>
+ new ExternalCatalogPartitionedTable(
+ isBatch,
+ isStreaming,
+ isSource = true,
+ isSink = false,
+ pc,
+ DescriptorProperties.toJavaMap(this)
+ )
+ case None =>
+ new ExternalCatalogTable(
+ isBatch,
+ isStreaming,
+ isSource = true,
+ isSink = false,
+ DescriptorProperties.toJavaMap(this))
+ }
/**
* Declares this external table as a table sink and returns the
* configured [[ExternalCatalogTable]].
*
* @return External catalog table
*/
- def asTableSink(): ExternalCatalogTable = {
- new ExternalCatalogTable(
- isBatch,
- isStreaming,
- isSource = false,
- isSink = true,
- DescriptorProperties.toJavaMap(this))
+ def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match {
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services