KurtYoung commented on a change in pull request #8844:
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r299835877
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -245,6 +247,57 @@ public String explain(Table table) {
return planner.getCompletionHints(statement, position);
}
+ @Override
+ public void sql(String statement) {
+ List<Operation> operations = planner.parse(statement);
+ operations.forEach(operation -> {
+ if (operation instanceof CreateTableOperation) {
+ CreateTableOperation operation1 =
(CreateTableOperation) operation;
+ registerTable(
+ operation1.getTablePath(),
+ operation1.getCatalogTable(),
+ operation1.isIgnoreIfExists());
+ } else if (operation instanceof ModifyOperation) {
+ queryConfigProvider.setConfig(new
StreamQueryConfig());
+ List<Transformation<?>> transformations =
+
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+ execEnv.apply(transformations);
+ } else {
+ throw new ValidationException(
+ "Unsupported SQL statement: sql() only
accepts DDLs or Inserts.");
+ }
+ });
+ }
+
+ /**
+ * Registers a {@link CatalogBaseTable} under a given object path. The
{@code path} could be
+ * 3 formats:
+ * <ol>
+ * <li>`catalog.db.table`: A full table path including the catalog
name,
+ * the database name and table name.</li>
+ * <li>`db.table`: database name following table name, with the
current catalog name.</li>
+ * <li>`table`: Only the table name, with the current catalog name
and database name.</li>
+ * </ol>
+ * The registered tables then can be referenced in Sql queries.
+ *
+ * @param path The path under which the table will be
registered
+ * @param catalogTable The table to register
+ * @param ignoreIfExists If true, do nothing if there is already same
table name under
+ * the {@code path}. If false, a
TableAlreadyExistException throws.
+ */
+ private void registerTable(String[] path, CatalogBaseTable
catalogTable, boolean ignoreIfExists) {
Review comment:
you can try to extend current `registerTableInternal` function to meet your
requirement?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services