twalthr commented on a change in pull request #16287:
URL: https://github.com/apache/flink/pull/16287#discussion_r663837728
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -701,6 +701,27 @@ void createFunction(
*/
Table from(String path);
+ /**
+ * Returns a {@link Table} backed by the given {@link TableDescriptor
descriptor}.
+ *
+ * <p>The {@link TableDescriptor descriptor} is registered as a temporary
table (see {@link
Review comment:
We should add a bit more documentation here, something along these lines:
```
The {@link TableDescriptor descriptor} is registered as an inline (i.e.
anonymous) temporary table (see {@link #createTemporaryTable(String,
TableDescriptor)}) using a unique identifier and then read. Note that calling
this method multiple times, even with the same descriptor, results in multiple
temporary tables. In such cases, it is recommended to register it under a name
using #createTemporaryTable(String, TableDescriptor) and reference it via
from(String).
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -551,6 +551,17 @@ public Table from(String path) {
"Table %s was not found.",
unresolvedIdentifier)));
}
+ @Override
+ public Table from(TableDescriptor descriptor) {
+ final String path = TableDescriptorUtil.getUniqueAnonymousPath();
+
+ final ObjectIdentifier identifier =
+
catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(path));
+ createTemporaryTable(identifier.toObjectPath().getFullName(),
descriptor);
Review comment:
The `toObjectPath().getFullName()` and then reparsing seems error-prone
to me. How about introducing a `createTemporaryTableInternal` that takes an
`UnresolvedIdentifier` for both `from` and `createTemporaryTable`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
##########
@@ -701,6 +701,27 @@ void createFunction(
*/
Table from(String path);
+ /**
+ * Returns a {@link Table} backed by the given {@link TableDescriptor
descriptor}.
+ *
+ * <p>The {@link TableDescriptor descriptor} is registered as a temporary
table (see {@link
+ * #createTemporaryTable(String, TableDescriptor)}) and then read.
+ *
+ * <p>See the documentation of {@link
TableEnvironment#useDatabase(String)} or {@link
Review comment:
drop this as there is no path that the user can influence
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableDescriptorUtil.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Utilities for working with {@link TableDescriptor}. */
+@Internal
+class TableDescriptorUtil {
Review comment:
nit: if we don't add more methods here, we could also have a general
"inline" id utility also for `to/fromDataStream` and `Table.toString()` etc.
where we also need to generate those unique IDs.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##########
@@ -548,4 +548,20 @@ class CalcITCase extends StreamingTestBase {
TestBaseUtils.compareResultAsText(result, "42")
}
+ @Test
+ def testTableFromDescriptor(): Unit = {
Review comment:
use a non IT case instead, that can be located in the API module
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]