This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd91affd8040123e2f757b24859b7dad33e09532 Author: Yu Li <[email protected]> AuthorDate: Thu Jun 11 19:32:39 2020 +0800 [FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from OptionsFactory This closes #12673. --- flink-python/pyflink/datastream/state_backend.py | 8 ++-- .../streaming/state/OptionsFactoryAdapter.java | 55 ++++++++++++++++++++++ .../streaming/state/RocksDBOptionsFactory.java | 28 +---------- .../state/RocksDBOptionsFactoryAdapter.java | 4 +- .../streaming/state/RocksDBStateBackend.java | 10 ++-- .../state/RocksDBStateBackendConfigTest.java | 11 ++++- 6 files changed, 78 insertions(+), 38 deletions(-) diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py index d894137..46ccf66 100644 --- a/flink-python/pyflink/datastream/state_backend.py +++ b/flink-python/pyflink/datastream/state_backend.py @@ -682,11 +682,11 @@ class RocksDBStateBackend(StateBackend): The options factory must have a default constructor. """ gateway = get_gateway() - JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory + JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory j_options_factory_clz = load_java_class(options_factory_class_name) if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz): - raise ValueError("The input class not implements OptionsFactory.") - self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance()) + raise ValueError("The input class not implements RocksDBOptionsFactory.") + self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance()) def get_options(self): """ @@ -695,7 +695,7 @@ class RocksDBStateBackend(StateBackend): :return: The fully-qualified class name of the options factory in Java. """ - j_options_factory = self._j_rocks_db_state_backend.getOptions() + j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions() if j_options_factory is not None: return j_options_factory.getClass().getName() else: diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java new file mode 100644 index 0000000..666bc4b --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; + +import java.util.ArrayList; + +/** + * A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}. + */ +public class OptionsFactoryAdapter implements OptionsFactory { + + private static final long serialVersionUID = 1L; + + private final RocksDBOptionsFactory rocksDBOptionsFactory; + + OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) { + this.rocksDBOptionsFactory = rocksDBOptionsFactory; + } + + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return rocksDBOptionsFactory.createDBOptions(currentOptions, new ArrayList<>()); + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>()); + } + + @VisibleForTesting + RocksDBOptionsFactory getRocksDBOptionsFactory() { + return rocksDBOptionsFactory; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java index a5fd1e9..dffa9e9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java @@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import java.util.ArrayList; import java.util.Collection; /** @@ -32,7 +31,7 @@ import java.util.Collection; * <p>A typical pattern to use this OptionsFactory is as follows: * * <pre>{@code - * rocksDbBackend.setOptions(new RocksDBOptionsFactory() { + * rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() { * * public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { * return currentOptions.setMaxOpenFiles(1024); @@ -49,8 +48,7 @@ import java.util.Collection; * }); * }</pre> */ -@SuppressWarnings("deprecation") -public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable { +public interface RocksDBOptionsFactory extends java.io.Serializable { /** * This method should set the additional options on top of the current options object. @@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) { return nativeMetricOptions; } - - // ------------------------------------------------------------------------ - // for compatibility - // ------------------------------------------------------------------------ - - /** - * Do not override these methods, they are only to maintain interface compatibility with - * prior versions. They will be removed in one of the next versions. - */ - @Override - default DBOptions createDBOptions(DBOptions currentOptions) { - return createDBOptions(currentOptions, new ArrayList<>()); - } - - /** - * Do not override these methods, they are only to maintain interface compatibility with - * prior versions. They will be removed in one of the next versions. - */ - @Override - default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { - return createColumnOptions(currentOptions, new ArrayList<>()); - } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java index 36e8c3f..7efc32d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java @@ -67,9 +67,9 @@ final class RocksDBOptionsFactoryAdapter implements ConfigurableRocksDBOptionsFa } @Nullable - public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) { + static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) { return factory instanceof RocksDBOptionsFactoryAdapter ? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory - : factory; + : new OptionsFactoryAdapter(factory); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 873f57d..e9d27b6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -861,9 +861,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu */ @Deprecated public void setOptions(OptionsFactory optionsFactory) { - this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory - ? (RocksDBOptionsFactory) optionsFactory - : new RocksDBOptionsFactoryAdapter(optionsFactory); + this.rocksDbOptionsFactory = new RocksDBOptionsFactoryAdapter(optionsFactory); } /** @@ -874,7 +872,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu */ @Deprecated public OptionsFactory getOptions() { - return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory); + if (rocksDbOptionsFactory == null) { + return null; + } else { + return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory); + } } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 2167a00..1f691bf 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -552,7 +552,14 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader()); assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory); - assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory); + OptionsFactory optionsFactory = rocksDbBackend.getOptions(); + if (optionsFactory instanceof OptionsFactoryAdapter) { + RocksDBOptionsFactory rocksDBOptionsFactory = + ((OptionsFactoryAdapter) optionsFactory).getRocksDBOptionsFactory(); + assertTrue(rocksDBOptionsFactory instanceof TestOptionsFactory); + } else { + assertTrue(optionsFactory instanceof TestOptionsFactory); + } try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) { DBOptions dbOptions = optionsContainer.getDbOptions(); @@ -640,7 +647,7 @@ public class RocksDBStateBackendConfigTest { assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled()); assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths()); - assertEquals(original.getOptions(), copy.getOptions()); + assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions()); assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions()); FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
