JingsongLi commented on a change in pull request #9971: [FLINK-14490][table]
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393920
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
##########
@@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
* Searches for the specified table source, configures it accordingly,
and registers it as
* a table under the given name.
*
+ * <p>Temporary objects can shadow permanent ones. If a permanent
object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object
available again you can drop the
+ * corresponding temporary object.
+ *
* @param name table name to be registered in the table environment
+ * @deprecated use {@link #createTemporaryTable(String)}
*/
+ @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource<?> tableSource =
TableFactoryUtil.findAndCreateTableSource(this);
- tableEnv.registerTableSource(name, tableSource);
+ registration.createTableSource(name, tableSource);
}
/**
* Searches for the specified table sink, configures it accordingly,
and registers it as
* a table under the given name.
*
+ * <p>Temporary objects can shadow permanent ones. If a permanent
object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object
available again you can drop the
+ * corresponding temporary object.
+ *
* @param name table name to be registered in the table environment
+ * @deprecated use {@link #createTemporaryTable(String)}
*/
+ @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink<?> tableSink =
TableFactoryUtil.findAndCreateTableSink(this);
- tableEnv.registerTableSink(name, tableSink);
+ registration.createTableSink(name, tableSink);
}
/**
* Searches for the specified table source and sink, configures them
accordingly, and registers
* them as a table under the given name.
*
+ * <p>Temporary objects can shadow permanent ones. If a permanent
object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object
available again you can drop the
+ * corresponding temporary object.
+ *
* @param name table name to be registered in the table environment
+ * @deprecated use {@link #createTemporaryTable(String)}
*/
+ @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
+ /**
+ * Registers the table described by underlying properties in a given
path.
+ *
+ * <p>There is no distinction between source and sink at the descriptor
level anymore as this
+ * method does not perform actual class lookup. It only stores the
underlying properties. The
+ * actual source/sink lookup is performed when the table is used.
+ *
+ * <p>Temporary objects can shadow permanent ones. If a permanent
object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object
available again you can drop the
+ * corresponding temporary object.
+ *
+ * <p><b>NOTE:</b> The schema must be explicitly defined.
+ *
+ * @param path path where to register the temporary table
+ */
+ public void createTemporaryTable(String path) {
+ if (schemaDescriptor == null) {
+ throw new TableException(
+ "Table schema must be explicitly defined. To
derive schema from the underlying connector" +
+ " use
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+ }
+
+ Map<String, String> schemaProperties =
schemaDescriptor.toProperties();
+ TableSchema tableSchema = getTableSchema(schemaProperties);
+
+ Map<String, String> properties = new HashMap<>(toProperties());
+ schemaProperties.keySet().forEach(properties::remove);
+
+ CatalogTableImpl catalogTable = new CatalogTableImpl(
Review comment:
`CatalogTable` has `partitionKeys` too, consider add partition keys in
`CatalogTableImpl.toProperties` and parse partition keys here?
(We can add later too)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services