twalthr commented on a change in pull request #16788:
URL: https://github.com/apache/flink/pull/16788#discussion_r689301705
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
##########
@@ -34,25 +34,82 @@
* properties may include execution configurations such as watermark interval,
max parallelism etc.,
* table specific initialization configuration such as if the queries should
be executed in batch
* mode.
+ *
+ * <p>Implementations should use the canonical class name ({@link
Class#getCanonicalName()} of the
+ * {@link PlannerFactory} as the {@link #factoryIdentifier()}.
*/
@Internal
-public interface PlannerFactory extends ComponentFactory {
-
- /**
- * Creates a corresponding {@link Planner}.
- *
- * @param properties Static properties of the {@link Planner}, the same
that were used for
- * factory lookup.
- * @param executor The executor required by the planner.
- * @param tableConfig The configuration of the planner to use.
- * @param functionCatalog The function catalog to look up user defined
functions.
- * @param catalogManager The catalog manager to look up tables and views.
- * @return instance of a {@link Planner}
- */
- Planner create(
- Map<String, String> properties,
- Executor executor,
- TableConfig tableConfig,
- FunctionCatalog functionCatalog,
- CatalogManager catalogManager);
+public interface PlannerFactory extends Factory {
+
+ default String factoryIdentifier() {
+ return getClass().getCanonicalName();
Review comment:
can we align this with the executor factory?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.delegation.PlannerFactory.Context;
+import org.apache.flink.table.delegation.PlannerFactory.DefaultPlannerContext;
+
+import java.util.Map;
+
+/** Utility for discovering and instantiating {@link PlannerFactory}. */
+@Internal
+public class PlannerFactoryUtil {
Review comment:
Rename to `DelegationUtil` and add the methods for finding the executor
as well. Currently, this is only a method in various `TableEnvironmentImpl`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
##########
@@ -225,9 +225,9 @@ public String getExecutor() {
@Internal
public Map<String, String> toPlannerProperties() {
Review comment:
We don't need the `toPlannerProperties` anymore. The refactoring can be
similar to FLINK-23482.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
##########
@@ -19,53 +19,43 @@
package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.Set;
/** Factory to construct a {@link BatchPlanner} or {@link StreamPlanner}. */
@Internal
public final class BlinkPlannerFactory implements PlannerFactory {
@Override
- public Planner create(
- Map<String, String> properties,
- Executor executor,
- TableConfig tableConfig,
- FunctionCatalog functionCatalog,
- CatalogManager catalogManager) {
- if
(Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE,
"true"))) {
- return new StreamPlanner(executor, tableConfig, functionCatalog,
catalogManager);
- } else {
- return new BatchPlanner(executor, tableConfig, functionCatalog,
catalogManager);
- }
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
}
@Override
- public Map<String, String> optionalContext() {
- Map<String, String> map = new HashMap<>();
- map.put(EnvironmentSettings.CLASS_NAME,
this.getClass().getCanonicalName());
- return map;
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
}
@Override
- public Map<String, String> requiredContext() {
- DescriptorProperties properties = new DescriptorProperties();
- return properties.asMap();
- }
-
- @Override
- public List<String> supportedProperties() {
- return Arrays.asList(EnvironmentSettings.STREAMING_MODE,
EnvironmentSettings.CLASS_NAME);
+ public Planner create(Context context) {
+ if (Boolean.parseBoolean(
+
context.getOptions().getOrDefault(EnvironmentSettings.STREAMING_MODE, "true")))
{
Review comment:
the runtime mode can be directly read from configuration or be its own
flag in the context
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
##########
@@ -19,53 +19,43 @@
package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.Set;
/** Factory to construct a {@link BatchPlanner} or {@link StreamPlanner}. */
@Internal
public final class BlinkPlannerFactory implements PlannerFactory {
Review comment:
rename to `DefaultPlannerFactory`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]