This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 96b2aa0b6684d0b06602c128a45f8886adf9324a
Author: Rui Fan <[email protected]>
AuthorDate: Fri Dec 29 23:17:07 2023 +0800

    [FLINK-33450][autoscaler] StandaloneAutoscalerEntrypoint supports the 
JDBCAutoScalerStateStore
---
 .../autoscaler_standalone_configuration.html       | 24 ++++++
 flink-autoscaler-plugin-jdbc/pom.xml               |  1 -
 flink-autoscaler-standalone/pom.xml                | 14 ++++
 .../standalone/AutoscalerStateStoreFactory.java    | 93 ++++++++++++++++++++++
 .../standalone/StandaloneAutoscalerEntrypoint.java | 12 +--
 .../config/AutoscalerStandaloneOptions.java        | 50 ++++++++++++
 .../AutoscalerStateStoreFactoryTest.java           | 82 +++++++++++++++++++
 pom.xml                                            |  2 +
 8 files changed, 272 insertions(+), 6 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html 
b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
index f385c5b0..4cfdc52f 100644
--- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
+++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
@@ -32,5 +32,29 @@
             <td>Integer</td>
             <td>The port of flink cluster when the flink-cluster fetcher is 
used.</td>
         </tr>
+        <tr>
+            
<td><h5>autoscaler.standalone.state-store.jdbc.password-env-variable</h5></td>
+            <td style="word-wrap: break-word;">"STATE_STORE_JDBC_PWD"</td>
+            <td>String</td>
+            <td>The environment variable name of jdbc state store password 
when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>. In general, the 
environment variable name doesn't need to be changed. Users need to export the 
password using this environment variable.</td>
+        </tr>
+        <tr>
+            <td><h5>autoscaler.standalone.state-store.jdbc.url</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The jdbc url of jdbc state store when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>, such as: <code 
class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br
 />This option is required when using JDBC state store.</td>
+        </tr>
+        <tr>
+            <td><h5>autoscaler.standalone.state-store.jdbc.username</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The jdbc username of jdbc state store when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>autoscaler.standalone.state-store.type</h5></td>
+            <td style="word-wrap: break-word;">MEMORY</td>
+            <td><p>Enum</p></td>
+            <td>The autoscaler state store type.<br /><br />Possible 
values:<ul><li>"MEMORY": The state store based on the Java Heap, the state will 
be discarded after process restarts.</li><li>"JDBC": The state store which 
persists its state in JDBC related database. It's recommended in 
production.</li></ul></td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-autoscaler-plugin-jdbc/pom.xml 
b/flink-autoscaler-plugin-jdbc/pom.xml
index e009e3f0..adadedda 100644
--- a/flink-autoscaler-plugin-jdbc/pom.xml
+++ b/flink-autoscaler-plugin-jdbc/pom.xml
@@ -33,7 +33,6 @@ under the License.
 
     <properties>
         <testcontainers.version>1.18.2</testcontainers.version>
-        <derby.version>10.15.2.0</derby.version>
         <postgres.version>42.5.4</postgres.version>
         <mysql.version>8.0.33</mysql.version>
     </properties>
diff --git a/flink-autoscaler-standalone/pom.xml 
b/flink-autoscaler-standalone/pom.xml
index ec7af293..85a6533e 100644
--- a/flink-autoscaler-standalone/pom.xml
+++ b/flink-autoscaler-standalone/pom.xml
@@ -39,6 +39,12 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler-plugin-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime</artifactId>
@@ -157,6 +163,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- Derby tests -->
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>${derby.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
new file mode 100644
index 00000000..4b0ed170
--- /dev/null
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
+import org.apache.flink.autoscaler.jdbc.state.JdbcStateInteractor;
+import org.apache.flink.autoscaler.jdbc.state.JdbcStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import java.sql.DriverManager;
+
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_USERNAME;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The factory of {@link AutoScalerStateStore}. */
+public class AutoscalerStateStoreFactory {
+
+    /** Out-of-box state store type. */
+    public enum StateStoreType implements DescribedEnum {
+        MEMORY(
+                "The state store based on the Java Heap, the state will be 
discarded after process restarts."),
+        JDBC(
+                "The state store which persists its state in JDBC related 
database. It's recommended in production.");
+
+        private final InlineElement description;
+
+        StateStoreType(String description) {
+            this.description = text(description);
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            AutoScalerStateStore<KEY, Context> create(Configuration conf) 
throws Exception {
+        var stateStoreType = conf.get(STATE_STORE_TYPE);
+        switch (stateStoreType) {
+            case MEMORY:
+                return new InMemoryAutoScalerStateStore<>();
+            case JDBC:
+                return createJdbcStateStore(conf);
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unknown state store type : %s. Optional state 
store types are: %s and %s.",
+                                stateStoreType, MEMORY, JDBC));
+        }
+    }
+
+    private static <KEY, Context extends JobAutoScalerContext<KEY>>
+            AutoScalerStateStore<KEY, Context> 
createJdbcStateStore(Configuration conf)
+                    throws Exception {
+        final var jdbcUrl = conf.get(STATE_STORE_JDBC_URL);
+        checkArgument(
+                jdbcUrl != null,
+                "%s is required for jdbc state store.",
+                STATE_STORE_JDBC_URL.key());
+        var user = conf.get(STATE_STORE_JDBC_USERNAME);
+        var password = 
System.getenv().get(conf.get(STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE));
+
+        var conn = DriverManager.getConnection(jdbcUrl, user, password);
+        return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new 
JdbcStateInteractor(conn)));
+    }
+}
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
index fa635f81..35eba18c 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
@@ -30,7 +30,6 @@ import org.apache.flink.autoscaler.event.LoggingEventHandler;
 import 
org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
 import 
org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
-import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
@@ -48,14 +47,17 @@ public class StandaloneAutoscalerEntrypoint {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandaloneAutoscalerEntrypoint.class);
 
-    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
main(String[] args) {
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
main(String[] args)
+            throws Exception {
         var conf = ParameterTool.fromArgs(args).getConfiguration();
         LOG.info("The standalone autoscaler is started, configuration: {}", 
conf);
 
         // Initialize JobListFetcher and JobAutoScaler.
         var eventHandler = new LoggingEventHandler<KEY, Context>();
         JobListFetcher<KEY, Context> jobListFetcher = 
createJobListFetcher(conf);
-        var autoScaler = createJobAutoscaler(eventHandler);
+
+        AutoScalerStateStore<KEY, Context> stateStore = 
AutoscalerStateStoreFactory.create(conf);
+        var autoScaler = createJobAutoscaler(eventHandler, stateStore);
 
         var autoscalerExecutor =
                 new StandaloneAutoscalerExecutor<>(conf, jobListFetcher, 
eventHandler, autoScaler);
@@ -81,8 +83,8 @@ public class StandaloneAutoscalerEntrypoint {
 
     private static <KEY, Context extends JobAutoScalerContext<KEY>>
             JobAutoScaler<KEY, Context> createJobAutoscaler(
-                    AutoScalerEventHandler<KEY, Context> eventHandler) {
-        AutoScalerStateStore<KEY, Context> stateStore = new 
InMemoryAutoScalerStateStore<>();
+                    AutoScalerEventHandler<KEY, Context> eventHandler,
+                    AutoScalerStateStore<KEY, Context> stateStore) {
         return new JobAutoScalerImpl<>(
                 new RestApiMetricsCollector<>(),
                 new ScalingMetricEvaluator(),
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
index 195bc54e..510042fe 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
@@ -17,11 +17,16 @@
 
 package org.apache.flink.autoscaler.standalone.config;
 
+import 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
 
 import java.time.Duration;
 
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
+import static org.apache.flink.configuration.description.TextElement.code;
+
 /** Config options related to the autoscaler standalone module. */
 public class AutoscalerStandaloneOptions {
 
@@ -59,4 +64,49 @@ public class AutoscalerStandaloneOptions {
                     .withDeprecatedKeys("flinkClusterPort")
                     .withDescription(
                             "The port of flink cluster when the flink-cluster 
fetcher is used.");
+
+    public static final ConfigOption<StateStoreType> STATE_STORE_TYPE =
+            autoscalerStandaloneConfig("state-store.type")
+                    .enumType(StateStoreType.class)
+                    .defaultValue(StateStoreType.MEMORY)
+                    .withDescription("The autoscaler state store type.");
+
+    public static final ConfigOption<String> STATE_STORE_JDBC_URL =
+            autoscalerStandaloneConfig("state-store.jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The jdbc url of jdbc state store 
when %s has been set to %s, such as: %s.",
+                                            code(STATE_STORE_TYPE.key()),
+                                            code(JDBC.toString()),
+                                            
code("jdbc:mysql://localhost:3306/flink_autoscaler"))
+                                    .linebreak()
+                                    .text("This option is required when using 
JDBC state store.")
+                                    .build());
+
+    public static final ConfigOption<String> STATE_STORE_JDBC_USERNAME =
+            autoscalerStandaloneConfig("state-store.jdbc.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The jdbc username of jdbc state 
store when %s has been set to %s.",
+                                            code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
+                                    .build());
+
+    public static final ConfigOption<String> 
STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE =
+            
autoscalerStandaloneConfig("state-store.jdbc.password-env-variable")
+                    .stringType()
+                    .defaultValue("STATE_STORE_JDBC_PWD")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The environment variable name of 
jdbc state store password when %s has been set to %s. "
+                                                    + "In general, the 
environment variable name doesn't need to be changed. Users need to "
+                                                    + "export the password 
using this environment variable.",
+                                            code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
+                                    .build());
 }
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
new file mode 100644
index 00000000..01f62d5c
--- /dev/null
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AutoscalerStateStoreFactoryTest {
+
+    @Test
+    void testCreateDefaultStateStore() throws Exception {
+        // Test for memory state store is created by default.
+        var stateStore = AutoscalerStateStoreFactory.create(new 
Configuration());
+        
assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class);
+    }
+
+    @Test
+    void testCreateInMemoryStateStore() throws Exception {
+        // Test for memory state store is created explicitly.
+        final var conf = new Configuration();
+        conf.set(STATE_STORE_TYPE, MEMORY);
+        var stateStore = AutoscalerStateStoreFactory.create(conf);
+        
assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class);
+    }
+
+    @Test
+    void testCreateJdbcStateStoreWithoutURL() {
+        // Test for missing the jdbc url.
+        final var conf = new Configuration();
+        conf.set(STATE_STORE_TYPE, JDBC);
+        assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("%s is required for jdbc state store.", 
STATE_STORE_JDBC_URL.key());
+    }
+
+    @Test
+    void testCreateJdbcStateStore() throws Exception {
+        final var jdbcUrl = "jdbc:derby:memory:test";
+        DriverManager.getConnection(String.format("%s;create=true", 
jdbcUrl)).close();
+
+        // Test for create JDBC State store.
+        final var conf = new Configuration();
+        conf.set(STATE_STORE_TYPE, JDBC);
+        conf.set(STATE_STORE_JDBC_URL, jdbcUrl);
+
+        var stateStore = AutoscalerStateStoreFactory.create(conf);
+        assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);
+
+        try {
+            DriverManager.getConnection(String.format("%s;shutdown=true", 
jdbcUrl)).close();
+        } catch (SQLException ignored) {
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 716fb9ce..3fe65467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,8 @@ under the License.
                     It MUST be a space-separated list not containing any 
newlines,
                     of entries in the form 
'[-]{2}add-[opens|exports]=<module>/<package>=ALL-UNNAMED'.-->
         <surefire.module.config/>
+
+        <derby.version>10.15.2.0</derby.version>
     </properties>
 
     <dependencyManagement>

Reply via email to