This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 96b2aa0b6684d0b06602c128a45f8886adf9324a Author: Rui Fan <[email protected]> AuthorDate: Fri Dec 29 23:17:07 2023 +0800 [FLINK-33450][autoscaler] StandaloneAutoscalerEntrypoint supports the JDBCAutoScalerStateStore --- .../autoscaler_standalone_configuration.html | 24 ++++++ flink-autoscaler-plugin-jdbc/pom.xml | 1 - flink-autoscaler-standalone/pom.xml | 14 ++++ .../standalone/AutoscalerStateStoreFactory.java | 93 ++++++++++++++++++++++ .../standalone/StandaloneAutoscalerEntrypoint.java | 12 +-- .../config/AutoscalerStandaloneOptions.java | 50 ++++++++++++ .../AutoscalerStateStoreFactoryTest.java | 82 +++++++++++++++++++ pom.xml | 2 + 8 files changed, 272 insertions(+), 6 deletions(-) diff --git a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html index f385c5b0..4cfdc52f 100644 --- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html +++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html @@ -32,5 +32,29 @@ <td>Integer</td> <td>The port of flink cluster when the flink-cluster fetcher is used.</td> </tr> + <tr> + <td><h5>autoscaler.standalone.state-store.jdbc.password-env-variable</h5></td> + <td style="word-wrap: break-word;">"STATE_STORE_JDBC_PWD"</td> + <td>String</td> + <td>The environment variable name of jdbc state store password when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>. In general, the environment variable name doesn't need to be changed. Users need to export the password using this environment variable.</td> + </tr> + <tr> + <td><h5>autoscaler.standalone.state-store.jdbc.url</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The jdbc url of jdbc state store when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>, such as: <code class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br />This option is required when using JDBC state store.</td> + </tr> + <tr> + <td><h5>autoscaler.standalone.state-store.jdbc.username</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The jdbc username of jdbc state store when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>.</td> + </tr> + <tr> + <td><h5>autoscaler.standalone.state-store.type</h5></td> + <td style="word-wrap: break-word;">MEMORY</td> + <td><p>Enum</p></td> + <td>The autoscaler state store type.<br /><br />Possible values:<ul><li>"MEMORY": The state store based on the Java Heap, the state will be discarded after process restarts.</li><li>"JDBC": The state store which persists its state in JDBC related database. It's recommended in production.</li></ul></td> + </tr> </tbody> </table> diff --git a/flink-autoscaler-plugin-jdbc/pom.xml b/flink-autoscaler-plugin-jdbc/pom.xml index e009e3f0..adadedda 100644 --- a/flink-autoscaler-plugin-jdbc/pom.xml +++ b/flink-autoscaler-plugin-jdbc/pom.xml @@ -33,7 +33,6 @@ under the License. <properties> <testcontainers.version>1.18.2</testcontainers.version> - <derby.version>10.15.2.0</derby.version> <postgres.version>42.5.4</postgres.version> <mysql.version>8.0.33</mysql.version> </properties> diff --git a/flink-autoscaler-standalone/pom.xml b/flink-autoscaler-standalone/pom.xml index ec7af293..85a6533e 100644 --- a/flink-autoscaler-standalone/pom.xml +++ b/flink-autoscaler-standalone/pom.xml @@ -39,6 +39,12 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler-plugin-jdbc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> @@ -157,6 +163,14 @@ under the License. <scope>test</scope> </dependency> + <!-- Derby tests --> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>${derby.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java new file mode 100644 index 00000000..4b0ed170 --- /dev/null +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.standalone; + +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore; +import org.apache.flink.autoscaler.jdbc.state.JdbcStateInteractor; +import org.apache.flink.autoscaler.jdbc.state.JdbcStateStore; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; + +import java.sql.DriverManager; + +import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; +import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_USERNAME; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE; +import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The factory of {@link AutoScalerStateStore}. */ +public class AutoscalerStateStoreFactory { + + /** Out-of-box state store type. */ + public enum StateStoreType implements DescribedEnum { + MEMORY( + "The state store based on the Java Heap, the state will be discarded after process restarts."), + JDBC( + "The state store which persists its state in JDBC related database. It's recommended in production."); + + private final InlineElement description; + + StateStoreType(String description) { + this.description = text(description); + } + + @Override + public InlineElement getDescription() { + return description; + } + } + + public static <KEY, Context extends JobAutoScalerContext<KEY>> + AutoScalerStateStore<KEY, Context> create(Configuration conf) throws Exception { + var stateStoreType = conf.get(STATE_STORE_TYPE); + switch (stateStoreType) { + case MEMORY: + return new InMemoryAutoScalerStateStore<>(); + case JDBC: + return createJdbcStateStore(conf); + default: + throw new IllegalArgumentException( + String.format( + "Unknown state store type : %s. Optional state store types are: %s and %s.", + stateStoreType, MEMORY, JDBC)); + } + } + + private static <KEY, Context extends JobAutoScalerContext<KEY>> + AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf) + throws Exception { + final var jdbcUrl = conf.get(STATE_STORE_JDBC_URL); + checkArgument( + jdbcUrl != null, + "%s is required for jdbc state store.", + STATE_STORE_JDBC_URL.key()); + var user = conf.get(STATE_STORE_JDBC_USERNAME); + var password = System.getenv().get(conf.get(STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE)); + + var conn = DriverManager.getConnection(jdbcUrl, user, password); + return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn))); + } +} diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java index fa635f81..35eba18c 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java @@ -30,7 +30,6 @@ import org.apache.flink.autoscaler.event.LoggingEventHandler; import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher; import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer; import org.apache.flink.autoscaler.state.AutoScalerStateStore; -import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; @@ -48,14 +47,17 @@ public class StandaloneAutoscalerEntrypoint { private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerEntrypoint.class); - public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args) { + public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args) + throws Exception { var conf = ParameterTool.fromArgs(args).getConfiguration(); LOG.info("The standalone autoscaler is started, configuration: {}", conf); // Initialize JobListFetcher and JobAutoScaler. var eventHandler = new LoggingEventHandler<KEY, Context>(); JobListFetcher<KEY, Context> jobListFetcher = createJobListFetcher(conf); - var autoScaler = createJobAutoscaler(eventHandler); + + AutoScalerStateStore<KEY, Context> stateStore = AutoscalerStateStoreFactory.create(conf); + var autoScaler = createJobAutoscaler(eventHandler, stateStore); var autoscalerExecutor = new StandaloneAutoscalerExecutor<>(conf, jobListFetcher, eventHandler, autoScaler); @@ -81,8 +83,8 @@ public class StandaloneAutoscalerEntrypoint { private static <KEY, Context extends JobAutoScalerContext<KEY>> JobAutoScaler<KEY, Context> createJobAutoscaler( - AutoScalerEventHandler<KEY, Context> eventHandler) { - AutoScalerStateStore<KEY, Context> stateStore = new InMemoryAutoScalerStateStore<>(); + AutoScalerEventHandler<KEY, Context> eventHandler, + AutoScalerStateStore<KEY, Context> stateStore) { return new JobAutoScalerImpl<>( new RestApiMetricsCollector<>(), new ScalingMetricEvaluator(), diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java index 195bc54e..510042fe 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java @@ -17,11 +17,16 @@ package org.apache.flink.autoscaler.standalone.config; +import org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; import java.time.Duration; +import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; +import static org.apache.flink.configuration.description.TextElement.code; + /** Config options related to the autoscaler standalone module. */ public class AutoscalerStandaloneOptions { @@ -59,4 +64,49 @@ public class AutoscalerStandaloneOptions { .withDeprecatedKeys("flinkClusterPort") .withDescription( "The port of flink cluster when the flink-cluster fetcher is used."); + + public static final ConfigOption<StateStoreType> STATE_STORE_TYPE = + autoscalerStandaloneConfig("state-store.type") + .enumType(StateStoreType.class) + .defaultValue(StateStoreType.MEMORY) + .withDescription("The autoscaler state store type."); + + public static final ConfigOption<String> STATE_STORE_JDBC_URL = + autoscalerStandaloneConfig("state-store.jdbc.url") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "The jdbc url of jdbc state store when %s has been set to %s, such as: %s.", + code(STATE_STORE_TYPE.key()), + code(JDBC.toString()), + code("jdbc:mysql://localhost:3306/flink_autoscaler")) + .linebreak() + .text("This option is required when using JDBC state store.") + .build()); + + public static final ConfigOption<String> STATE_STORE_JDBC_USERNAME = + autoscalerStandaloneConfig("state-store.jdbc.username") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "The jdbc username of jdbc state store when %s has been set to %s.", + code(STATE_STORE_TYPE.key()), code(JDBC.toString())) + .build()); + + public static final ConfigOption<String> STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE = + autoscalerStandaloneConfig("state-store.jdbc.password-env-variable") + .stringType() + .defaultValue("STATE_STORE_JDBC_PWD") + .withDescription( + Description.builder() + .text( + "The environment variable name of jdbc state store password when %s has been set to %s. " + + "In general, the environment variable name doesn't need to be changed. Users need to " + + "export the password using this environment variable.", + code(STATE_STORE_TYPE.key()), code(JDBC.toString())) + .build()); } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java new file mode 100644 index 00000000..01f62d5c --- /dev/null +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.standalone; + +import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.Test; + +import java.sql.DriverManager; +import java.sql.SQLException; + +import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; +import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AutoscalerStateStoreFactoryTest { + + @Test + void testCreateDefaultStateStore() throws Exception { + // Test for memory state store is created by default. + var stateStore = AutoscalerStateStoreFactory.create(new Configuration()); + assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class); + } + + @Test + void testCreateInMemoryStateStore() throws Exception { + // Test for memory state store is created explicitly. + final var conf = new Configuration(); + conf.set(STATE_STORE_TYPE, MEMORY); + var stateStore = AutoscalerStateStoreFactory.create(conf); + assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class); + } + + @Test + void testCreateJdbcStateStoreWithoutURL() { + // Test for missing the jdbc url. + final var conf = new Configuration(); + conf.set(STATE_STORE_TYPE, JDBC); + assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("%s is required for jdbc state store.", STATE_STORE_JDBC_URL.key()); + } + + @Test + void testCreateJdbcStateStore() throws Exception { + final var jdbcUrl = "jdbc:derby:memory:test"; + DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close(); + + // Test for create JDBC State store. + final var conf = new Configuration(); + conf.set(STATE_STORE_TYPE, JDBC); + conf.set(STATE_STORE_JDBC_URL, jdbcUrl); + + var stateStore = AutoscalerStateStoreFactory.create(conf); + assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class); + + try { + DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close(); + } catch (SQLException ignored) { + } + } +} diff --git a/pom.xml b/pom.xml index 716fb9ce..3fe65467 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,8 @@ under the License. It MUST be a space-separated list not containing any newlines, of entries in the form '[-]{2}add-[opens|exports]=<module>/<package>=ALL-UNNAMED'.--> <surefire.module.config/> + + <derby.version>10.15.2.0</derby.version> </properties> <dependencyManagement>
