This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 293067ab3fa227eb6a38ef5d1255fa4641da142c Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Dec 15 18:50:39 2023 +0800 [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore --- .github/workflows/ci.yml | 5 + flink-autoscaler-plugin-jdbc/pom.xml | 146 ++++++++++ .../jdbc/state/JdbcAutoScalerStateStore.java | 253 +++++++++++++++++ .../autoscaler/jdbc/state/JdbcStateInteractor.java | 119 ++++++++ .../autoscaler/jdbc/state/JdbcStateStore.java | 92 +++++++ .../flink/autoscaler/jdbc/state/JobStateView.java | 262 ++++++++++++++++++ .../flink/autoscaler/jdbc/state/StateType.java | 58 ++++ .../src/main/resources/schema/derby_schema.sql | 26 ++ .../src/main/resources/schema/mysql_schema.sql | 32 +++ .../src/main/resources/schema/postgres_schema.sql | 44 +++ .../state/AbstractJdbcStateInteractorITCase.java | 93 +++++++ .../jdbc/state/AbstractJdbcStateStoreITCase.java | 304 +++++++++++++++++++++ .../jdbc/state/CountableJdbcStateInteractor.java | 81 ++++++ .../jdbc/state/JdbcAutoScalerStateStoreTest.java | 102 +++++++ .../autoscaler/jdbc/state/JobStateViewTest.java | 201 ++++++++++++++ .../flink/autoscaler/jdbc/state/StateTypeTest.java | 45 +++ .../jdbc/testutils/databases/DatabaseTest.java | 26 ++ .../testutils/databases/derby/DerbyExtension.java | 88 ++++++ .../testutils/databases/derby/DerbyTestBase.java | 35 +++ .../testutils/databases/mysql/MySQL56TestBase.java | 34 +++ .../testutils/databases/mysql/MySQL57TestBase.java | 34 +++ .../testutils/databases/mysql/MySQL8TestBase.java | 34 +++ .../testutils/databases/mysql/MySQLExtension.java | 76 ++++++ .../databases/postgres/PostgreSQLExtension.java | 75 +++++ .../databases/postgres/PostgreSQLTestBase.java | 34 +++ .../src/test/resources/log4j2.properties | 27 ++ .../test/resources/test_schema/mysql_schema.sql | 28 ++ .../test/resources/test_schema/postgres_schema.sql | 27 ++ pom.xml | 1 + 29 files changed, 2382 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 40bdfc64..3734b59b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,6 +66,11 @@ jobs: cd flink-kubernetes-webhook mvn verify -Dit.skip=false cd .. + - name: Tests in flink-autoscaler-plugin-jdbc + run: | + cd flink-autoscaler-plugin-jdbc + mvn verify -Dit.skip=false + cd .. e2e_ci: runs-on: ubuntu-latest strategy: diff --git a/flink-autoscaler-plugin-jdbc/pom.xml b/flink-autoscaler-plugin-jdbc/pom.xml new file mode 100644 index 00000000..e009e3f0 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/pom.xml @@ -0,0 +1,146 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-kubernetes-operator-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.8-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-autoscaler-plugin-jdbc</artifactId> + <name>Flink Autoscaler Plugin JDBC</name> + <packaging>jar</packaging> + + <properties> + <testcontainers.version>1.18.2</testcontainers.version> + <derby.version>10.15.2.0</derby.version> + <postgres.version>42.5.4</postgres.version> + <mysql.version>8.0.33</mysql.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers-bom</artifactId> + <version>${testcontainers.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + + </dependencyManagement> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done + after the flink-1.18.1 is released. --> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jsr310</artifactId> + </dependency> + + <!-- Test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- Derby tests --> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>${derby.version}</version> + <scope>test</scope> + </dependency> + + <!-- Postgres tests --> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgres.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + + <!-- MySQL tests --> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java new file mode 100644 index 00000000..8a78782b --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.ScalingTracking; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; + +import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS; +import static org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING; + +/** + * The state store which persists its state in JDBC related database. + * + * @param <KEY> The job key. + * @param <Context> The job autoscaler context. + */ +@Experimental +public class JdbcAutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> + implements AutoScalerStateStore<KEY, Context> { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcAutoScalerStateStore.class); + + private final JdbcStateStore jdbcStateStore; + + protected static final ObjectMapper YAML_MAPPER = + new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(new AutoScalerSerDeModule()); + + public JdbcAutoScalerStateStore(JdbcStateStore jdbcStateStore) { + this.jdbcStateStore = jdbcStateStore; + } + + @Override + public void storeScalingHistory( + Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) + throws Exception { + jdbcStateStore.putSerializedState( + getSerializeKey(jobContext), + SCALING_HISTORY, + serializeScalingHistory(scalingHistory)); + } + + @Nonnull + @Override + public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory( + Context jobContext) { + Optional<String> serializedScalingHistory = + jdbcStateStore.getSerializedState(getSerializeKey(jobContext), SCALING_HISTORY); + if (serializedScalingHistory.isEmpty()) { + return new HashMap<>(); + } + try { + return deserializeScalingHistory(serializedScalingHistory.get()); + } catch (JacksonException e) { + LOG.error( + "Could not deserialize scaling history, possibly the format changed. Discarding...", + e); + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), SCALING_HISTORY); + return new HashMap<>(); + } + } + + @Override + public void removeScalingHistory(Context jobContext) { + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), SCALING_HISTORY); + } + + @Override + public void storeScalingTracking(Context jobContext, ScalingTracking scalingTrack) + throws Exception { + jdbcStateStore.putSerializedState( + getSerializeKey(jobContext), + SCALING_TRACKING, + serializeScalingTracking(scalingTrack)); + } + + @Override + public ScalingTracking getScalingTracking(Context jobContext) { + Optional<String> serializedRescalingHistory = + jdbcStateStore.getSerializedState(getSerializeKey(jobContext), SCALING_TRACKING); + if (serializedRescalingHistory.isEmpty()) { + return new ScalingTracking(); + } + try { + return deserializeScalingTracking(serializedRescalingHistory.get()); + } catch (JacksonException e) { + LOG.error( + "Could not deserialize rescaling history, possibly the format changed. Discarding...", + e); + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), SCALING_TRACKING); + return new ScalingTracking(); + } + } + + @Override + public void storeCollectedMetrics( + Context jobContext, SortedMap<Instant, CollectedMetrics> metrics) throws Exception { + jdbcStateStore.putSerializedState( + getSerializeKey(jobContext), COLLECTED_METRICS, serializeEvaluatedMetrics(metrics)); + } + + @Nonnull + @Override + public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context jobContext) { + Optional<String> serializedEvaluatedMetricsOpt = + jdbcStateStore.getSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS); + if (serializedEvaluatedMetricsOpt.isEmpty()) { + return new TreeMap<>(); + } + try { + return deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get()); + } catch (JacksonException e) { + LOG.error( + "Could not deserialize metric history, possibly the format changed. Discarding...", + e); + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS); + return new TreeMap<>(); + } + } + + @Override + public void removeCollectedMetrics(Context jobContext) { + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS); + } + + @Override + public void storeParallelismOverrides( + Context jobContext, Map<String, String> parallelismOverrides) { + jdbcStateStore.putSerializedState( + getSerializeKey(jobContext), + PARALLELISM_OVERRIDES, + serializeParallelismOverrides(parallelismOverrides)); + } + + @Nonnull + @Override + public Map<String, String> getParallelismOverrides(Context jobContext) { + return jdbcStateStore + .getSerializedState(getSerializeKey(jobContext), PARALLELISM_OVERRIDES) + .map(JdbcAutoScalerStateStore::deserializeParallelismOverrides) + .orElse(new HashMap<>()); + } + + @Override + public void removeParallelismOverrides(Context jobContext) { + jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), PARALLELISM_OVERRIDES); + } + + @Override + public void clearAll(Context jobContext) { + jdbcStateStore.clearAll(getSerializeKey(jobContext)); + } + + @Override + public void flush(Context jobContext) throws Exception { + jdbcStateStore.flush(getSerializeKey(jobContext)); + } + + @Override + public void removeInfoFromCache(KEY jobKey) { + jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey)); + } + + private String getSerializeKey(Context jobContext) { + return getSerializeKey(jobContext.getJobKey()); + } + + private String getSerializeKey(KEY jobKey) { + return jobKey.toString(); + } + + // The serialization and deserialization are similar to KubernetesAutoScalerStateStore + protected static String serializeScalingHistory( + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) throws Exception { + return YAML_MAPPER.writeValueAsString(scalingHistory); + } + + private static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> deserializeScalingHistory( + String scalingHistory) throws JacksonException { + return YAML_MAPPER.readValue(scalingHistory, new TypeReference<>() {}); + } + + protected static String serializeScalingTracking(ScalingTracking scalingTracking) + throws Exception { + return YAML_MAPPER.writeValueAsString(scalingTracking); + } + + private static ScalingTracking deserializeScalingTracking(String scalingTracking) + throws JacksonException { + return YAML_MAPPER.readValue(scalingTracking, new TypeReference<>() {}); + } + + @VisibleForTesting + protected static String serializeEvaluatedMetrics( + SortedMap<Instant, CollectedMetrics> evaluatedMetrics) throws Exception { + return YAML_MAPPER.writeValueAsString(evaluatedMetrics); + } + + private static SortedMap<Instant, CollectedMetrics> deserializeEvaluatedMetrics( + String evaluatedMetrics) throws JacksonException { + return YAML_MAPPER.readValue(evaluatedMetrics, new TypeReference<>() {}); + } + + private static String serializeParallelismOverrides(Map<String, String> overrides) { + return ConfigurationUtils.convertValue(overrides, String.class); + } + + private static Map<String, String> deserializeParallelismOverrides(String overrides) { + return ConfigurationUtils.convertValue(overrides, Map.class); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java new file mode 100644 index 00000000..4c2f9a0f --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Responsible for interacting with the database. */ +public class JdbcStateInteractor { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class); + + private final Connection conn; + + public JdbcStateInteractor(Connection conn) { + this.conn = conn; + } + + public Map<StateType, String> queryData(String jobKey) throws Exception { + var query = + "select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?"; + var data = new HashMap<StateType, String>(); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setString(1, jobKey); + var rs = pstmt.executeQuery(); + while (rs.next()) { + var stateTypeIdentifier = rs.getString("state_type"); + var stateType = StateType.createFromIdentifier(stateTypeIdentifier); + var stateValue = rs.getString("state_value"); + data.put(stateType, stateValue); + } + } + return data; + } + + public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws Exception { + var query = + String.format( + "DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)", + String.join(",", Collections.nCopies(deletedStateTypes.size(), "?"))); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setString(1, jobKey); + int i = 2; + for (var stateType : deletedStateTypes) { + pstmt.setString(i++, stateType.getIdentifier()); + } + pstmt.execute(); + } + LOG.info("Delete jobKey: {} stateTypes: {} from database.", jobKey, deletedStateTypes); + } + + public void createData( + String jobKey, List<StateType> createdStateTypes, Map<StateType, String> data) + throws Exception { + var query = + "INSERT INTO t_flink_autoscaler_state_store (job_key, state_type, state_value) values (?, ?, ?)"; + try (var pstmt = conn.prepareStatement(query)) { + for (var stateType : createdStateTypes) { + pstmt.setString(1, jobKey); + pstmt.setString(2, stateType.getIdentifier()); + + String stateValue = data.get(stateType); + checkState( + stateValue != null, + "The state value shouldn't be null during inserting. " + + "It may be a bug, please raise a JIRA to Flink Community."); + pstmt.setString(3, stateValue); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + LOG.info("Insert jobKey: {} stateTypes: {} from database.", jobKey, createdStateTypes); + } + + public void updateData( + String jobKey, List<StateType> updatedStateTypes, Map<StateType, String> data) + throws Exception { + var query = + "UPDATE t_flink_autoscaler_state_store set state_value = ? where job_key = ? and state_type = ?"; + + try (var pstmt = conn.prepareStatement(query)) { + for (var stateType : updatedStateTypes) { + String stateValue = data.get(stateType); + checkState( + stateValue != null, + "The state value shouldn't be null during inserting. " + + "It may be a bug, please raise a JIRA to Flink Community."); + pstmt.setString(1, stateValue); + pstmt.setString(2, jobKey); + pstmt.setString(3, stateType.getIdentifier()); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java new file mode 100644 index 00000000..1e237a47 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** The jdbc state store. */ +public class JdbcStateStore { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class); + + private final ConcurrentHashMap<String, JobStateView> cache = new ConcurrentHashMap<>(); + + private final JdbcStateInteractor jdbcStateInteractor; + + public JdbcStateStore(JdbcStateInteractor jdbcStateInteractor) { + this.jdbcStateInteractor = jdbcStateInteractor; + } + + protected void putSerializedState(String jobKey, StateType stateType, String value) { + getJobStateView(jobKey).put(stateType, value); + } + + protected Optional<String> getSerializedState(String jobKey, StateType stateType) { + return Optional.ofNullable(getJobStateView(jobKey).get(stateType)); + } + + protected void removeSerializedState(String jobKey, StateType stateType) { + getJobStateView(jobKey).remove(stateType); + } + + public void flush(String jobKey) throws Exception { + JobStateView jobStateView = cache.get(jobKey); + if (jobStateView == null) { + LOG.debug("The JobStateView doesn't exist, so skip the flush."); + return; + } + try { + jobStateView.flush(); + } catch (Exception e) { + LOG.error( + "Error while flush autoscaler info to database, invalidating to clear the cache", + e); + removeInfoFromCache(jobKey); + throw e; + } + } + + public void removeInfoFromCache(String jobKey) { + cache.remove(jobKey); + } + + public void clearAll(String jobKey) { + getJobStateView(jobKey).clear(); + } + + private JobStateView getJobStateView(String jobKey) { + return cache.computeIfAbsent( + jobKey, + (id) -> { + try { + return createJobStateView(jobKey); + } catch (Exception exception) { + throw new RuntimeException( + "Meet exception during create job state view.", exception); + } + }); + } + + private JobStateView createJobStateView(String jobKey) throws Exception { + return new JobStateView(jdbcStateInteractor, jobKey); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java new file mode 100644 index 00000000..f8d52417 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED; +import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE; +import static org.apache.flink.util.Preconditions.checkState; + +/** The view of job state. */ +@NotThreadSafe +public class JobStateView { + + /** + * The state of state type about the cache and database. + * + * <p>Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use + * them. + */ + @SuppressWarnings("unused") + enum State { + + /** State doesn't exist at database, and it's not used so far, so it's not needed. */ + NOT_NEEDED(false, false, false), + /** State is only stored locally, not created in JDBC database yet. */ + NEEDS_CREATE(true, false, true), + /** State exists in JDBC database but there are newer local changes. */ + NEEDS_UPDATE(true, true, true), + /** State is stored locally and in database, and they are same. */ + UP_TO_DATE(true, true, false), + /** State is stored in database, but it's deleted in local. */ + NEEDS_DELETE(false, true, true); + + /** The data of this state type is stored in locally when it is true. */ + private final boolean inLocally; + + /** The data of this state type is stored in database when it is true. */ + private final boolean inDatabase; + + /** The data of this state type is stored in database when it is true. */ + private final boolean needFlush; + + State(boolean inLocally, boolean inDatabase, boolean needFlush) { + this.inLocally = inLocally; + this.inDatabase = inDatabase; + this.needFlush = needFlush; + } + + public boolean isNeedFlush() { + return needFlush; + } + } + + /** Transition old state to the new state when some operations happen to cache or database. */ + private static class StateTransitioner { + + /** The transition when put data to cache. */ + @Nonnull + public State putTransition(@Nonnull State oldState) { + switch (oldState) { + case NOT_NEEDED: + case NEEDS_CREATE: + return NEEDS_CREATE; + case NEEDS_UPDATE: + case UP_TO_DATE: + case NEEDS_DELETE: + return NEEDS_UPDATE; + default: + throw new IllegalArgumentException( + String.format("Unknown state : %s.", oldState)); + } + } + + /** The transition when delete data from cache. */ + @Nonnull + public State deleteTransition(@Nonnull State oldState) { + switch (oldState) { + case NOT_NEEDED: + case NEEDS_CREATE: + return NOT_NEEDED; + case NEEDS_UPDATE: + case UP_TO_DATE: + case NEEDS_DELETE: + return NEEDS_DELETE; + default: + throw new IllegalArgumentException( + String.format("Unknown state : %s.", oldState)); + } + } + + /** The transition when flush data from cache to database. */ + @Nonnull + public State flushTransition(@Nonnull State oldState) { + switch (oldState) { + case NOT_NEEDED: + case NEEDS_DELETE: + return NOT_NEEDED; + case NEEDS_CREATE: + case NEEDS_UPDATE: + case UP_TO_DATE: + return UP_TO_DATE; + default: + throw new IllegalArgumentException( + String.format("Unknown state : %s.", oldState)); + } + } + } + + private static final StateTransitioner STATE_TRANSITIONER = new StateTransitioner(); + + private final JdbcStateInteractor jdbcStateInteractor; + private final String jobKey; + private final Map<StateType, String> data; + + /** + * The state is maintained for each state type, which means that part of state types of current + * job are stored in the database, but the rest of the state types may have been created in the + * database. + */ + private final Map<StateType, State> states; + + public JobStateView(JdbcStateInteractor jdbcStateInteractor, String jobKey) throws Exception { + this.jdbcStateInteractor = jdbcStateInteractor; + this.jobKey = jobKey; + this.data = jdbcStateInteractor.queryData(jobKey); + this.states = generateStates(this.data); + } + + private Map<StateType, State> generateStates(Map<StateType, String> data) { + final var states = new HashMap<StateType, State>(); + for (StateType stateType : StateType.values()) { + if (data.containsKey(stateType)) { + states.put(stateType, UP_TO_DATE); + } else { + states.put(stateType, NOT_NEEDED); + } + } + return states; + } + + public String get(StateType stateType) { + return data.get(stateType); + } + + public void put(StateType stateType, String value) { + data.put(stateType, value); + updateState(stateType, STATE_TRANSITIONER::putTransition); + } + + public void remove(StateType stateType) { + var oldKey = data.remove(stateType); + if (oldKey == null) { + return; + } + updateState(stateType, STATE_TRANSITIONER::deleteTransition); + } + + public void clear() { + if (data.isEmpty()) { + return; + } + var iterator = data.keySet().iterator(); + while (iterator.hasNext()) { + var stateType = iterator.next(); + iterator.remove(); + updateState(stateType, STATE_TRANSITIONER::deleteTransition); + } + } + + public void flush() throws Exception { + if (states.values().stream().noneMatch(State::isNeedFlush)) { + // No any state needs to be flushed. + return; + } + + // Build the data that need to be flushed. + var flushData = new HashMap<State, List<StateType>>(3); + for (Map.Entry<StateType, State> stateEntry : states.entrySet()) { + State state = stateEntry.getValue(); + if (!state.isNeedFlush()) { + continue; + } + StateType stateType = stateEntry.getKey(); + flushData.compute( + state, + (st, list) -> { + if (list == null) { + list = new LinkedList<>(); + } + list.add(stateType); + return list; + }); + } + + for (var entry : flushData.entrySet()) { + State state = entry.getKey(); + List<StateType> stateTypes = entry.getValue(); + switch (state) { + case NEEDS_CREATE: + jdbcStateInteractor.createData(jobKey, stateTypes, data); + break; + case NEEDS_DELETE: + jdbcStateInteractor.deleteData(jobKey, stateTypes); + break; + case NEEDS_UPDATE: + jdbcStateInteractor.updateData(jobKey, stateTypes, data); + break; + default: + throw new IllegalStateException(String.format("Unknown state : %s", state)); + } + for (var stateType : stateTypes) { + updateState(stateType, STATE_TRANSITIONER::flushTransition); + } + } + } + + private void updateState(StateType stateType, Function<State, State> stateTransitioner) { + states.compute( + stateType, + (type, oldState) -> { + checkState( + oldState != null, + "The state of each state type should be maintained in states. " + + "It may be a bug, please raise a JIRA to Flink Community."); + return stateTransitioner.apply(oldState); + }); + } + + @VisibleForTesting + public Map<StateType, String> getDataReadOnly() { + return Collections.unmodifiableMap(data); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java new file mode 100644 index 00000000..728afaa2 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.util.StringUtils; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The state type. */ +public enum StateType { + SCALING_HISTORY("scalingHistory"), + SCALING_TRACKING("scalingTracking"), + COLLECTED_METRICS("collectedMetrics"), + PARALLELISM_OVERRIDES("parallelismOverrides"); + + /** + * The identifier of each state type, it will be used to store. Please ensure the identifier is + * different for each state type. + */ + private final String identifier; + + StateType(String identifier) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(identifier) && identifier.length() < 30, + "Ensure the identifier length is less than 30 to reduce the storage cost."); + this.identifier = identifier; + } + + public String getIdentifier() { + return identifier; + } + + public static StateType createFromIdentifier(String identifier) { + for (StateType stateType : StateType.values()) { + if (stateType.getIdentifier().equals(identifier)) { + return stateType; + } + } + + throw new IllegalArgumentException( + String.format("Unknown StateType identifier : %s.", identifier)); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql new file mode 100644 index 00000000..c8defefe --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE t_flink_autoscaler_state_store +( + id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + job_key VARCHAR(191) NOT NULL, + state_type VARCHAR(100) NOT NULL, + state_value CLOB NOT NULL, + PRIMARY KEY (id) +); diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql new file mode 100644 index 00000000..7a6c2d04 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +create database if not exists `flink_autoscaler` character set utf8mb4 collate utf8mb4_general_ci; + +use `flink_autoscaler`; + +create table `t_flink_autoscaler_state_store` +( + `id` bigint not null auto_increment, + `update_time` datetime not null default current_timestamp on update current_timestamp comment 'update time', + `job_key` varchar(191) not null comment 'The job key', + `state_type` varchar(100) not null comment 'The state type', + `state_value` longtext not null comment 'The real state', + primary key (`id`) using btree, + unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree +) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; + diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql new file mode 100644 index 00000000..d9fadfc0 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE DATABASE flink_autoscaler; +\c flink_autoscaler; + +CREATE TABLE t_flink_autoscaler_state_store +( + id BIGSERIAL NOT NULL, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + job_key TEXT NOT NULL, + state_type TEXT NOT NULL, + state_value TEXT NOT NULL, + PRIMARY KEY (id), + UNIQUE (job_key, state_type) +); + +CREATE OR REPLACE FUNCTION update_flink_autoscaler_update_time_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.update_time = CURRENT_TIMESTAMP; +RETURN NEW; +END; +$$ language 'plpgsql'; + +CREATE TRIGGER update_t_flink_autoscaler_state_store_modtime + BEFORE UPDATE ON t_flink_autoscaler_state_store + FOR EACH ROW + EXECUTE FUNCTION update_flink_autoscaler_update_time_column(); + diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java new file mode 100644 index 00000000..da737a54 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY; +import static org.assertj.core.api.Assertions.assertThat; + +/** The abstract IT case for {@link JdbcStateInteractor}. */ +abstract class AbstractJdbcStateInteractorITCase implements DatabaseTest { + + @Test + void testAllOperations() throws Exception { + var jobKey = "jobKey"; + var value1 = "value1"; + var value2 = "value2"; + var value3 = "value3"; + try (var conn = getConnection()) { + var jdbcStateInteractor = new JdbcStateInteractor(conn); + assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty(); + + // Test for creating data. + jdbcStateInteractor.createData( + jobKey, + List.of(COLLECTED_METRICS, SCALING_HISTORY), + Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2)); + assertThat(jdbcStateInteractor.queryData(jobKey)) + .isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2)); + + // Test for updating data. + jdbcStateInteractor.updateData( + jobKey, + List.of(COLLECTED_METRICS), + Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2)); + assertThat(jdbcStateInteractor.queryData(jobKey)) + .isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2)); + + // Test for deleting data. + jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS)); + assertThat(jdbcStateInteractor.queryData(jobKey)) + .isEqualTo(Map.of(SCALING_HISTORY, value2)); + jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY)); + assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty(); + } + } +} + +/** Test {@link JdbcStateInteractor} via Derby database. */ +class DerbyJdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase + implements DerbyTestBase {} + +/** Test {@link JdbcStateInteractor} via MySQL 5.6.x. */ +class MySQL56JdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase + implements MySQL56TestBase {} + +/** Test {@link JdbcStateInteractor} via MySQL 5.7.x. */ +class MySQL57JdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase + implements MySQL57TestBase {} + +/** Test {@link JdbcStateInteractor} via MySQL 8.x. */ +class MySQL8JdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase + implements MySQL8TestBase {} + +/** Test {@link JdbcStateInteractor} via Postgre SQL. */ +class PostgreSQLJdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase + implements PostgreSQLTestBase {} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java new file mode 100644 index 00000000..5ab47137 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** The abstract IT case for {@link JdbcStateStore}. */ +abstract class AbstractJdbcStateStoreITCase implements DatabaseTest { + + private static final String DEFAULT_JOB_KEY = "jobKey"; + private Connection conn; + private CountableJdbcStateInteractor jdbcStateInteractor; + private JdbcStateStore jdbcStateStore; + + @BeforeEach + void beforeEach() throws Exception { + this.conn = getConnection(); + this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn); + this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor); + } + + @AfterEach + void afterEach() throws SQLException { + if (conn != null) { + conn.close(); + } + } + + @Test + void testCaching() throws Exception { + var value1 = "value1"; + var value2 = "value2"; + var value3 = "value3"; + + jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0); + + // Query from database. + jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // The rest of state types of same job key shouldn't query database. + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty(); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Putting does not go to database, unless flushing. + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1); + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, value2); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Flush together! Create counter is one. + jdbcStateStore.flush(DEFAULT_JOB_KEY); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Get + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2); + var job2 = "job2"; + assertThat(jdbcStateStore.getSerializedState(job2, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 1); + assertThat(jdbcStateStore.getSerializedState(job2, SCALING_HISTORY)).isEmpty(); + + // Put and flush state for job2 + jdbcStateStore.putSerializedState(job2, SCALING_TRACKING, value3); + jdbcStateStore.flush(job2); + jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 2); + + // Build the new JdbcStateStore + var newJdbcStore = new JdbcStateStore(jdbcStateInteractor); + assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)) + .hasValue(value1); + assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)) + .hasValue(value2); + jdbcStateInteractor.assertCountableJdbcInteractor(3, 0, 0, 2); + + assertThat(newJdbcStore.getSerializedState(job2, SCALING_TRACKING)).hasValue(value3); + jdbcStateInteractor.assertCountableJdbcInteractor(4, 0, 0, 2); + + // Removing the data from cache and query from database again. + newJdbcStore.removeInfoFromCache(job2); + assertThat(newJdbcStore.getSerializedState(job2, SCALING_TRACKING)).hasValue(value3); + jdbcStateInteractor.assertCountableJdbcInteractor(5, 0, 0, 2); + } + + @Test + void testDeleting() throws Exception { + var value1 = "value1"; + + jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Get from cache, and it shouldn't exist in database. + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)) + .hasValue(value1); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Deleting before flushing + jdbcStateStore.removeSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Flush method shouldn't flush any data. + jdbcStateStore.flush(DEFAULT_JOB_KEY); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Put and flush data to database. + var value2 = "value2"; + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value2); + jdbcStateStore.flush(DEFAULT_JOB_KEY); + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value2); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Deleting after flushing, data is deleted in cache, but it still exists in database. + jdbcStateStore.removeSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value2); + + // Flushing + jdbcStateStore.flush(DEFAULT_JOB_KEY); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1); + + // Get from database for a new JdbcStateStore. + var newJdbcStore = new JdbcStateStore(jdbcStateInteractor); + assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + } + + @Test + void testErrorHandlingDuringFlush() throws Exception { + var value1 = "value1"; + var value2 = "value2"; + jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + + // Modify the database directly. + var tmpJdbcInteractor = new JdbcStateInteractor(conn); + tmpJdbcInteractor.createData( + DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1)); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1); + + // Cache cannot read data of database. + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Create it with SQLException due to the data has already existed in database. + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value2); + assertThatThrownBy(() -> jdbcStateStore.flush(DEFAULT_JOB_KEY)) + .hasCauseInstanceOf(SQLException.class); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Get normally. + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)) + .hasValue(value1); + jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 1); + } + + @Test + void testErrorHandlingDuringQuery() throws Exception { + var value1 = "value1"; + final var expectedException = new RuntimeException("Database isn't stable."); + + var exceptionableJdbcStateInteractor = + new CountableJdbcStateInteractor(conn) { + private final AtomicBoolean isFirst = new AtomicBoolean(true); + + @Override + public Map<StateType, String> queryData(String jobKey) throws Exception { + if (isFirst.get()) { + isFirst.set(false); + throw expectedException; + } + return super.queryData(jobKey); + } + }; + + var exceptionableJdbcStore = new JdbcStateStore(exceptionableJdbcStateInteractor); + + // First get will fail. + jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0); + assertThatThrownBy( + () -> + exceptionableJdbcStore.getSerializedState( + DEFAULT_JOB_KEY, COLLECTED_METRICS)) + .rootCause() + .isSameAs(expectedException); + + // It's recovered. + assertThat(exceptionableJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)) + .isEmpty(); + exceptionableJdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1); + exceptionableJdbcStore.flush(DEFAULT_JOB_KEY); + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1); + } + + @Test + void testDiscardAllState() throws Exception { + var value1 = "value1"; + var value2 = "value2"; + var value3 = "value3"; + + // Put and flush all state types first. + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1); + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, value2); + jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING, value3); + jdbcStateStore.flush(DEFAULT_JOB_KEY); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2); + assertStateValueForCacheAndDatabase(SCALING_TRACKING, value3); + + // Clear all in cache. + jdbcStateStore.clearAll(DEFAULT_JOB_KEY); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty(); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_HISTORY)).hasValue(value2); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_TRACKING)).hasValue(value3); + + // Flush! + jdbcStateStore.flush(DEFAULT_JOB_KEY); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty(); + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty(); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty(); + } + + private void assertStateValueForCacheAndDatabase(StateType stateType, String expectedValue) + throws Exception { + assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, stateType)) + .hasValue(expectedValue); + assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, stateType)).hasValue(expectedValue); + } + + private Optional<String> getValueFromDatabase(String jobKey, StateType stateType) + throws Exception { + var jdbcInteractor = new JdbcStateInteractor(conn); + return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType)); + } +} + +/** Test {@link JdbcStateStore} via Derby database. */ +class DerbyJdbcStateStoreITCase extends AbstractJdbcStateStoreITCase implements DerbyTestBase {} + +/** Test {@link JdbcStateStore} via MySQL 5.6.x. */ +class MySQL56JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase implements MySQL56TestBase {} + +/** Test {@link JdbcStateStore} via MySQL 5.7.x. */ +class MySQL57JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase implements MySQL57TestBase {} + +/** Test {@link JdbcStateStore} via MySQL 8. */ +class MySQL8JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase implements MySQL8TestBase {} + +/** Test {@link JdbcStateStore} via Postgre SQL. */ +class PostgreSQLJdbcStoreITCase extends AbstractJdbcStateStoreITCase + implements PostgreSQLTestBase {} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java new file mode 100644 index 00000000..8fafc95d --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import java.sql.Connection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Countable {@link JdbcStateInteractor}. */ +public class CountableJdbcStateInteractor extends JdbcStateInteractor { + + private final AtomicLong queryCounter; + private final AtomicLong deleteCounter; + private final AtomicLong createCounter; + private final AtomicLong updateCounter; + + public CountableJdbcStateInteractor(Connection conn) { + super(conn); + queryCounter = new AtomicLong(); + deleteCounter = new AtomicLong(); + createCounter = new AtomicLong(); + updateCounter = new AtomicLong(); + } + + @Override + public Map<StateType, String> queryData(String jobKey) throws Exception { + queryCounter.incrementAndGet(); + return super.queryData(jobKey); + } + + @Override + public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws Exception { + deleteCounter.incrementAndGet(); + super.deleteData(jobKey, deletedStateTypes); + } + + @Override + public void createData( + String jobKey, List<StateType> createdStateTypes, Map<StateType, String> data) + throws Exception { + createCounter.incrementAndGet(); + super.createData(jobKey, createdStateTypes, data); + } + + @Override + public void updateData( + String jobKey, List<StateType> updatedStateTypes, Map<StateType, String> data) + throws Exception { + updateCounter.incrementAndGet(); + super.updateData(jobKey, updatedStateTypes, data); + } + + public void assertCountableJdbcInteractor( + long expectedQueryCounter, + long expectedDeleteCounter, + long expectedUpdateCounter, + long expectedCreateCounter) { + assertThat(queryCounter).hasValue(expectedQueryCounter); + assertThat(deleteCounter).hasValue(expectedDeleteCounter); + assertThat(updateCounter).hasValue(expectedUpdateCounter); + assertThat(createCounter).hasValue(expectedCreateCounter); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java new file mode 100644 index 00000000..a79c07bc --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; +import org.apache.flink.autoscaler.state.AbstractAutoScalerStateStoreTest; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.TreeMap; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link JdbcAutoScalerStateStore}. + * + * <p>Note: {@link #testDiscardInvalidHistory} is referred from {@link + * KubernetesAutoScalerStateStoreTest}. + */ +class JdbcAutoScalerStateStoreTest + extends AbstractAutoScalerStateStoreTest<JobID, JobAutoScalerContext<JobID>> + implements DerbyTestBase { + + private JdbcStateStore jdbcStateStore; + private JdbcAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> cachedStateStore; + + @Override + protected void preSetup() throws Exception { + jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getConnection())); + cachedStateStore = new JdbcAutoScalerStateStore<>(jdbcStateStore); + } + + @Override + protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> + createPhysicalAutoScalerStateStore() throws Exception { + return new JdbcAutoScalerStateStore<>( + new JdbcStateStore(new JdbcStateInteractor(getConnection()))); + } + + @Override + protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> + createCachedAutoScalerStateStore() { + return cachedStateStore; + } + + @Override + protected JobAutoScalerContext<JobID> createJobContext() { + return createDefaultJobAutoScalerContext(); + } + + @Test + void testDiscardInvalidHistory() throws Exception { + jdbcStateStore.putSerializedState( + ctx.getJobKey().toString(), StateType.COLLECTED_METRICS, "invalid"); + jdbcStateStore.putSerializedState( + ctx.getJobKey().toString(), StateType.SCALING_HISTORY, "invalid2"); + + var now = Instant.now(); + + assertThat( + jdbcStateStore.getSerializedState( + ctx.getJobKey().toString(), StateType.COLLECTED_METRICS)) + .isPresent(); + assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty(); + assertThat( + jdbcStateStore.getSerializedState( + ctx.getJobKey().toString(), StateType.COLLECTED_METRICS)) + .isEmpty(); + + assertThat( + jdbcStateStore.getSerializedState( + ctx.getJobKey().toString(), StateType.SCALING_HISTORY)) + .isPresent(); + Assertions.assertEquals(new TreeMap<>(), getTrimmedScalingHistory(stateStore, ctx, now)); + assertThat( + jdbcStateStore.getSerializedState( + ctx.getJobKey().toString(), StateType.SCALING_HISTORY)) + .isEmpty(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java new file mode 100644 index 00000000..3989dbff --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Optional; + +import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY; +import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link JobStateView}. */ +class JobStateViewTest implements DerbyTestBase { + + private static final String DEFAULT_JOB_KEY = "jobKey"; + private Connection conn; + private CountableJdbcStateInteractor jdbcStateInteractor; + private JobStateView jobStateView; + + @BeforeEach + void beforeEach() throws Exception { + this.conn = getConnection(); + this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn); + this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + } + + @AfterEach + void afterEach() throws SQLException { + if (conn != null) { + conn.close(); + } + } + + @Test + void testAllOperations() throws Exception { + // All state types should be get together to avoid query database frequently. + assertThat(jobStateView.get(COLLECTED_METRICS)).isNull(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + assertThat(jobStateView.get(SCALING_HISTORY)).isNull(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Put data to cache, and it shouldn't exist in database. + var value1 = "value1"; + jobStateView.put(COLLECTED_METRICS, value1); + assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value1); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty(); + + var value2 = "value2"; + jobStateView.put(SCALING_HISTORY, value2); + assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value2); + assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); + + // Test creating together. + jobStateView.flush(); + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Test updating data to cache, and they aren't updated in database. + var value3 = "value3"; + jobStateView.put(COLLECTED_METRICS, value3); + assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value3); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1); + + var value4 = "value4"; + jobStateView.put(SCALING_HISTORY, value4); + assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value4); + assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Test updating together. + jobStateView.flush(); + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value3); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value4); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 1, 1); + + // Test deleting data from cache, and they aren't deleted in database. + jobStateView.remove(COLLECTED_METRICS); + assertThat(jobStateView.get(COLLECTED_METRICS)).isNull(); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value3); + + jobStateView.remove(SCALING_HISTORY); + assertThat(jobStateView.get(SCALING_HISTORY)).isNull(); + assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value4); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 1, 1); + + // Test updating together. + jobStateView.flush(); + assertThat(jobStateView.get(COLLECTED_METRICS)).isNull(); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty(); + assertThat(jobStateView.get(SCALING_HISTORY)).isNull(); + assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 1, 1); + } + + @Test + void testAvoidUnnecessaryFlushes() throws Exception { + var value1 = "value1"; + jobStateView.put(COLLECTED_METRICS, value1); + jobStateView.flush(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Avoid unnecessary flush for creating. + jobStateView.flush(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Avoid unnecessary flush for deleting. + jobStateView.clear(); + jobStateView.flush(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1); + jobStateView.flush(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1); + + // Avoid unnecessary flush even if clear is called.. + jobStateView.clear(); + jobStateView.flush(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1); + } + + @Test + void testCreateDeleteAndUpdateWorkTogether() throws Exception { + var value1 = "value1"; + var value2 = "value2"; + var value3 = "value3"; + var value4 = "value4"; + // Create 2 state types first. + jobStateView.put(COLLECTED_METRICS, value1); + jobStateView.put(SCALING_HISTORY, value2); + jobStateView.flush(); + assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Delete one, update one and create one. + jobStateView.remove(COLLECTED_METRICS); + jobStateView.put(SCALING_HISTORY, value3); + jobStateView.put(SCALING_TRACKING, value4); + + assertThat(jobStateView.get(COLLECTED_METRICS)).isNull(); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1); + assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value3); + assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2); + assertThat(jobStateView.get(SCALING_TRACKING)).isEqualTo(value4); + assertThat(getValueFromDatabase(SCALING_TRACKING)).isEmpty(); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1); + + // Flush! + jobStateView.flush(); + assertThat(jobStateView.get(COLLECTED_METRICS)).isNull(); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty(); + assertStateValueForCacheAndDatabase(SCALING_HISTORY, value3); + assertStateValueForCacheAndDatabase(SCALING_TRACKING, value4); + jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 1, 2); + + // Build the new JobStateView + var newJobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY); + assertThat(newJobStateView.get(COLLECTED_METRICS)).isNull(); + assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty(); + assertThat(newJobStateView.get(SCALING_HISTORY)).isEqualTo(value3); + assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value3); + assertThat(newJobStateView.get(SCALING_TRACKING)).isEqualTo(value4); + assertThat(getValueFromDatabase(SCALING_TRACKING)).hasValue(value4); + jdbcStateInteractor.assertCountableJdbcInteractor(2, 1, 1, 2); + } + + private void assertStateValueForCacheAndDatabase(StateType stateType, String expectedValue) + throws Exception { + assertThat(jobStateView.get(stateType)).isEqualTo(expectedValue); + assertThat(getValueFromDatabase(stateType)).hasValue(expectedValue); + } + + private Optional<String> getValueFromDatabase(StateType stateType) throws Exception { + var jdbcInteractor = new JdbcStateInteractor(conn); + return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType)); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java new file mode 100644 index 00000000..ab92458c --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.state; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link StateType}. */ +class StateTypeTest { + + @Test + void testIdentifierAreDifferent() { + var identifierSet = new HashSet<String>(); + for (StateType stateType : StateType.values()) { + assertThat(identifierSet).doesNotContain(stateType.getIdentifier()); + identifierSet.add(stateType.getIdentifier()); + } + } + + @Test + void testIdentifierCanBeParsed() { + for (StateType stateType : StateType.values()) { + assertThat(StateType.createFromIdentifier(stateType.getIdentifier())) + .isSameAs(stateType); + } + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java new file mode 100644 index 00000000..261578b3 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases; + +import java.sql.Connection; + +/** Database testing. */ +public interface DatabaseTest { + + Connection getConnection() throws Exception; +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java new file mode 100644 index 00000000..b04c4df7 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.derby; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; + +/** The extension of Derby. */ +public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback { + + private static final List<String> TABLES = List.of("t_flink_autoscaler_state_store"); + private static final String JDBC_URL = "jdbc:derby:memory:test"; + + public Connection getConnection() throws Exception { + return DriverManager.getConnection(JDBC_URL); + } + + @Override + public void beforeAll(ExtensionContext extensionContext) throws Exception { + DriverManager.getConnection(String.format("%s;create=true", JDBC_URL)).close(); + + var stateStoreDDL = + "CREATE TABLE t_flink_autoscaler_state_store\n" + + "(\n" + + " id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" + + " update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + + " job_key VARCHAR(191) NOT NULL,\n" + + " state_type VARCHAR(100) NOT NULL,\n" + + " state_value CLOB NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ")\n"; + + var createIndex = + "CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store (job_key, state_type)"; + try (var conn = getConnection(); + var st = conn.createStatement()) { + st.execute(stateStoreDDL); + st.execute(createIndex); + } + } + + @Override + public void afterAll(ExtensionContext extensionContext) throws Exception { + try (var conn = getConnection(); + var st = conn.createStatement()) { + for (var tableName : TABLES) { + st.executeUpdate(String.format("DROP TABLE %s", tableName)); + } + } + try { + DriverManager.getConnection(String.format("%s;shutdown=true", JDBC_URL)).close(); + } catch (SQLException ignored) { + } + } + + @Override + public void afterEach(ExtensionContext extensionContext) throws Exception { + // Clean up all data + try (var conn = getConnection(); + var st = conn.createStatement()) { + for (var tableName : TABLES) { + st.executeUpdate(String.format("DELETE from %s", tableName)); + } + } + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java new file mode 100644 index 00000000..e0ac3f76 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.derby; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; + +/** Derby database for testing. */ +public interface DerbyTestBase extends DatabaseTest { + + @RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension(); + + @Override + default Connection getConnection() throws Exception { + return DERBY_EXTENSION.getConnection(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java new file mode 100644 index 00000000..f2b1858c --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; + +/** MySQL 5.6.x database for testing. */ +public interface MySQL56TestBase extends DatabaseTest { + + @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.6.51"); + + default Connection getConnection() throws Exception { + return MYSQL_EXTENSION.getConnection(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java new file mode 100644 index 00000000..0b8a6968 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; + +/** MySQL 5.7.x database for testing. */ +public interface MySQL57TestBase extends DatabaseTest { + + @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.7.41"); + + default Connection getConnection() throws Exception { + return MYSQL_EXTENSION.getConnection(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java new file mode 100644 index 00000000..daf77885 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; + +/** MySQL 8.x database for testing. */ +public interface MySQL8TestBase extends DatabaseTest { + + @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("8.0.32"); + + default Connection getConnection() throws Exception { + return MYSQL_EXTENSION.getConnection(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java new file mode 100644 index 00000000..40c67a06 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.MySQLContainer; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.List; + +/** The extension of MySQL. */ +class MySQLExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback { + + private static final String MYSQL_INIT_SCRIPT = "test_schema/mysql_schema.sql"; + private static final String DATABASE_NAME = "flink_autoscaler"; + private static final String USER_NAME = "root"; + private static final String PASSWORD = "123456"; + private static final List<String> TABLES = List.of("t_flink_autoscaler_state_store"); + + private final MySQLContainer<?> container; + + public MySQLExtension(String mysqlVersion) { + this.container = + new MySQLContainer<>(String.format("mysql:%s", mysqlVersion)) + .withCommand("--character-set-server=utf8") + .withDatabaseName(DATABASE_NAME) + .withUsername(USER_NAME) + .withPassword(PASSWORD) + .withInitScript(MYSQL_INIT_SCRIPT) + .withEnv("MYSQL_ROOT_HOST", "%"); + } + + public Connection getConnection() throws Exception { + return DriverManager.getConnection( + container.getJdbcUrl(), container.getUsername(), container.getPassword()); + } + + @Override + public void beforeAll(ExtensionContext extensionContext) { + container.start(); + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + container.stop(); + } + + @Override + public void afterEach(ExtensionContext extensionContext) throws Exception { + try (var conn = getConnection(); + var st = conn.createStatement()) { + for (var tableName : TABLES) { + st.executeUpdate(String.format("DELETE from %s", tableName)); + } + } + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java new file mode 100644 index 00000000..de4d6e37 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.List; + +/** The extension of PostgreSQL. */ +class PostgreSQLExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback { + + private static final String INIT_SCRIPT = "test_schema/postgres_schema.sql"; + private static final String DATABASE_NAME = "flink_autoscaler"; + private static final String USER_NAME = "root"; + private static final String PASSWORD = "123456"; + private static final List<String> TABLES = List.of("t_flink_autoscaler_state_store"); + + private final PostgreSQLContainer<?> container; + + public PostgreSQLExtension(String postgresqlVersion) { + this.container = + new PostgreSQLContainer<>(String.format("postgres:%s", postgresqlVersion)) + .withDatabaseName(DATABASE_NAME) + .withUsername(USER_NAME) + .withPassword(PASSWORD) + .withInitScript(INIT_SCRIPT) + .withEnv("POSTGRES_MAX_CONNECTIONS", "10"); + } + + public Connection getConnection() throws Exception { + return DriverManager.getConnection( + container.getJdbcUrl(), container.getUsername(), container.getPassword()); + } + + @Override + public void beforeAll(ExtensionContext extensionContext) { + container.start(); + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + container.stop(); + } + + @Override + public void afterEach(ExtensionContext extensionContext) throws Exception { + try (var conn = getConnection(); + var st = conn.createStatement()) { + for (var tableName : TABLES) { + st.executeUpdate(String.format("DELETE from %s", tableName)); + } + } + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java new file mode 100644 index 00000000..1e27a6e1 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres; + +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; + +/** PostgreSQL database for testing. */ +public interface PostgreSQLTestBase extends DatabaseTest { + + @RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new PostgreSQLExtension("15.1"); + + default Connection getConnection() throws Exception { + return POSTGRE_SQL_EXTENSION.getConnection(); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties new file mode 100644 index 00000000..5866179a --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] %msg%n%throwable} + diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql new file mode 100644 index 00000000..825038d5 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +create table `t_flink_autoscaler_state_store` +( + `id` bigint not null auto_increment, + `update_time` datetime not null default current_timestamp on update current_timestamp comment 'update time', + `job_key` varchar(191) not null comment 'The job key', + `state_type` varchar(100) not null comment 'The state type', + `state_value` longtext not null comment 'The real state', + primary key (`id`) using btree, + unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree +) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; + diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql new file mode 100644 index 00000000..beb90a75 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE t_flink_autoscaler_state_store +( + id BIGSERIAL NOT NULL, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + job_key TEXT NOT NULL, + state_type TEXT NOT NULL, + state_value TEXT NOT NULL, + PRIMARY KEY (id), + UNIQUE (job_key, state_type) +); diff --git a/pom.xml b/pom.xml index 42adf488..716fb9ce 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ under the License. <module>flink-kubernetes-docs</module> <module>flink-autoscaler</module> <module>flink-autoscaler-standalone</module> + <module>flink-autoscaler-plugin-jdbc</module> <module>examples/flink-sql-runner-example</module> <module>examples/flink-beam-example</module> <module>examples/kubernetes-client-examples</module>