KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#discussion_r301863478
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/PlannerConfigImplUtils.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.util; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.PlannerConfig; +import org.apache.flink.table.api.PlannerConfigImpl; +import org.apache.flink.table.api.TableConfig; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS; +import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_EMIT_EARLY_FIRE_DELAY; +import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_EMIT_EARLY_FIRE_ENABLED; +import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_EMIT_LATE_FIRE_DELAY; +import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_EMIT_LATE_FIRE_ENABLED; +import static org.apache.flink.table.api.OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY; + +/** + * Utility class for {@link PlannerConfigImpl} related helper functions. + */ +public class PlannerConfigImplUtils { + + /** + * Specifies the early firing interval in milli second, early fire is the emit strategy before watermark advanced + * to end of window. + * + * @param plannerConfig configuration to update. + * @param interval The early firing interval + */ + public static PlannerConfig withEarlyFireDelay(PlannerConfigImpl plannerConfig, Time interval) { + long intervalInMillis = interval.toMilliseconds(); + Long preEarlyFireInterval = getMillisecondFromConfigDuration(plannerConfig, SQL_EXEC_EMIT_EARLY_FIRE_DELAY); + if (preEarlyFireInterval != null && preEarlyFireInterval != intervalInMillis) { + // earlyFireInterval of the two query config is not equal and not the default + throw new RuntimeException("Currently not support different earlyFireInterval configs in one job"); + } + plannerConfig.setBoolean(SQL_EXEC_EMIT_EARLY_FIRE_ENABLED, true); + plannerConfig.setString(SQL_EXEC_EMIT_EARLY_FIRE_DELAY, intervalInMillis + " ms"); + return plannerConfig; + } + + /** + * Returns the early firing interval in milli second. + * + * @param plannerConfig the configuration object + * @return the early firing interval in milli second. + */ + public static Long getEarlyFireDelay(PlannerConfigImpl plannerConfig) { + return getMillisecondFromConfigDuration(plannerConfig, SQL_EXEC_EMIT_EARLY_FIRE_DELAY); + } + + /** + * Specifies the late firing delay in milli second, early fire is the emit strategy after watermark advanced to + * end of window. + * + * @param plannerConfig configuration to update. + * @param interval The early firing interval + */ + public static PlannerConfigImpl withLateFireDelay(PlannerConfigImpl plannerConfig, Time interval) { + long intervalInMillis = interval.toMilliseconds(); + Long preLateFireInterval = getMillisecondFromConfigDuration(plannerConfig, SQL_EXEC_EMIT_LATE_FIRE_DELAY); + if (preLateFireInterval != null && preLateFireInterval != intervalInMillis) { + // lateFireInterval of the two query config is not equal and not the default + throw new RuntimeException("Currently not support different lateFireInterval configs in one job"); + } + plannerConfig.setBoolean(SQL_EXEC_EMIT_LATE_FIRE_ENABLED, true); + plannerConfig.setString(SQL_EXEC_EMIT_LATE_FIRE_DELAY, intervalInMillis + " ms"); + return plannerConfig; + } + + /** + * Returns the late firing delay in milli second. + * + * @param plannerConfig the configuration object + * @return the late firing interval in milli second. + */ + public static Long getLateFireDelay(PlannerConfigImpl plannerConfig) { + return getMillisecondFromConfigDuration(plannerConfig, SQL_EXEC_EMIT_LATE_FIRE_DELAY); + } + + /** + * Returns time in milli second. + * + * @param plannerConfig the configuration object + * @param config config to fetch + * @return time in milli second. + */ + public static Long getMillisecondFromConfigDuration(PlannerConfigImpl plannerConfig, ConfigOption<String> config) { + String timeStr = plannerConfig.getString(config); + if (timeStr != null) { + Duration duration = Duration.create(timeStr); + if (duration.isFinite()) { + return duration.toMillis(); + } else { + throw new IllegalArgumentException(config.key() + " must be finite."); + } + } else { + return null; + } + } + + /** + * Returns whether a operator is disabled. + * + * @param plannerConfig the configuration object + * @return return true if the operator is disabled, else false. + */ + public static boolean isOperatorDisabled(PlannerConfigImpl plannerConfig, DisabledOperatorType operatorType) { + String disabledOperators = plannerConfig.getString(SQL_EXEC_DISABLED_OPERATORS); + String[] splittedDisabledOperators = disabledOperators.split(","); + Set<DisabledOperatorType> disabledOperatorSet = new HashSet<>(); + for (String splittedDisabledOperator : splittedDisabledOperators) { + splittedDisabledOperator = splittedDisabledOperator.trim(); + if (splittedDisabledOperator.isEmpty()) { + continue; + } + if (splittedDisabledOperator.equals("HashJoin")) { + disabledOperatorSet.add(DisabledOperatorType.BroadcastHashJoin); + disabledOperatorSet.add(DisabledOperatorType.ShuffleHashJoin); + } else { + disabledOperatorSet.add(DisabledOperatorType.valueOf(splittedDisabledOperator)); + } + } + return disabledOperatorSet.contains(operatorType); + } + + /** + * Returns the aggregate phase strategy configuration. + * + * @param plannerConfig the configuration object + * @return the aggregate phase strategy configuration. + */ + public static AggregatePhaseStrategy getAggPhaseEnforceStrategy(PlannerConfigImpl plannerConfig) { + String aggPhaseConf = plannerConfig.getString(SQL_OPTIMIZER_AGG_PHASE_STRATEGY).trim(); + if (aggPhaseConf.isEmpty()) { + return AggregatePhaseStrategy.AUTO; + } else { + return AggregatePhaseStrategy.valueOf(aggPhaseConf); + } + } + + /** + * Infers {@link PlannerConfigImpl} from {@link TableConfig}. + */ + public static PlannerConfigImpl from(TableConfig tableConfig) { Review comment: this static method can be provided by `PlannerConfigImpl` directly? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
