Copilot commented on code in PR #2364: URL: https://github.com/apache/fluss/pull/2364#discussion_r2686449821
########## fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; + +import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; +import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; +import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; +import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; + +/** + * The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job. + * + * <p>This class is responsible for parsing configuration parameters, initializing the Flink + * execution environment, and coordinating the construction of the tiering pipeline. + * + * <p>Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this + * class, extensibility is significantly improved. Developers can now extend this class to customize + * configuration extraction (e.g., injecting internal security tokens) without duplicating the core + * entrypoint boilerplate. + */ +public class FlussLakeTiering { + + private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; + + protected final StreamExecutionEnvironment execEnv; + protected final String dataLake; + protected final Map<String, String> flussConfigMap; + protected final Map<String, String> lakeConfigMap; + protected final Map<String, String> lakeTieringConfigMap; + + public FlussLakeTiering(String[] args) { + // parse params + final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); + Map<String, String> paramsMap = params.toMap(); + + // extract fluss config + flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); + // we need to get bootstrap.servers + String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); + if (bootstrapServers == null) { + throw new IllegalArgumentException( + String.format( + "The bootstrap server to fluss is not configured, please configure %s", + FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); + } + flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers); Review Comment: The bootstrap.servers value is retrieved from flussConfigMap and then immediately put back into the same map with the same key. This operation is redundant since the value already exists in the map at that key. Consider removing lines 73 or restructure the validation logic. ```suggestion ``` ########## fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; + +import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; +import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; +import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; +import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; + +/** + * The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job. + * + * <p>This class is responsible for parsing configuration parameters, initializing the Flink + * execution environment, and coordinating the construction of the tiering pipeline. + * + * <p>Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this + * class, extensibility is significantly improved. Developers can now extend this class to customize + * configuration extraction (e.g., injecting internal security tokens) without duplicating the core + * entrypoint boilerplate. + */ +public class FlussLakeTiering { + + private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; + + protected final StreamExecutionEnvironment execEnv; + protected final String dataLake; + protected final Map<String, String> flussConfigMap; + protected final Map<String, String> lakeConfigMap; + protected final Map<String, String> lakeTieringConfigMap; + + public FlussLakeTiering(String[] args) { + // parse params + final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); + Map<String, String> paramsMap = params.toMap(); + + // extract fluss config + flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); + // we need to get bootstrap.servers + String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); + if (bootstrapServers == null) { + throw new IllegalArgumentException( + String.format( + "The bootstrap server to fluss is not configured, please configure %s", + FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); + } + flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers); + + dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key()); + if (dataLake == null) { + throw new IllegalArgumentException( + ConfigOptions.DATALAKE_FORMAT.key() + " is not configured"); + } + + // extract lake config + lakeConfigMap = + extractAndRemovePrefix( + paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); + + // extract tiering service config + lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); + + // now, we must use full restart strategy if any task is failed, + // since committer is stateless, if tiering committer is failover, committer + // will lost the collected committable, and will never collect all committable to do commit + // todo: support region failover + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); + + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); + } + + protected void run() throws Exception { + // build an run lake tiering job Review Comment: Typo in comment: "an run" should be "and run". ```suggestion // build and run lake tiering job ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
