This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e13146f [FLINK-18242][state-backend-rocksdb] Remove the deprecated
OptionsFactory and related classes
e13146f is described below
commit e13146f80114266aa34c9fe9f3dc27e87f7a7649
Author: Yu Li <[email protected]>
AuthorDate: Thu Jun 11 19:32:39 2020 +0800
[FLINK-18242][state-backend-rocksdb] Remove the deprecated OptionsFactory
and related classes
This closes #12683.
---
.../generated/expert_rocksdb_section.html | 2 +-
.../generated/rocks_db_configuration.html | 2 +-
docs/ops/state/state_backends.md | 8 +-
docs/ops/state/state_backends.zh.md | 4 +-
flink-python/pyflink/datastream/state_backend.py | 8 +-
.../state/ConfigurableOptionsFactory.java | 39 ----------
.../state/DefaultConfigurableOptionsFactory.java | 6 +-
.../contrib/streaming/state/OptionsFactory.java | 69 -----------------
.../contrib/streaming/state/RocksDBOptions.java | 2 +-
.../streaming/state/RocksDBOptionsFactory.java | 30 +-------
.../state/RocksDBOptionsFactoryAdapter.java | 75 ------------------
.../streaming/state/RocksDBStateBackend.java | 24 ------
.../RocksDBOptionsFactoryCompatibilityTest.java | 89 ----------------------
.../state/RocksDBStateBackendConfigTest.java | 3 +-
14 files changed, 20 insertions(+), 341 deletions(-)
diff --git a/docs/_includes/generated/expert_rocksdb_section.html
b/docs/_includes/generated/expert_rocksdb_section.html
index d3cbfe1..b24058d 100644
--- a/docs/_includes/generated/expert_rocksdb_section.html
+++ b/docs/_includes/generated/expert_rocksdb_section.html
@@ -30,7 +30,7 @@
<td><h5>state.backend.rocksdb.predefined-options</h5></td>
<td style="word-wrap: break-word;">"DEFAULT"</td>
<td>String</td>
- <td>The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community. Current supported candidate
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user
customized options and options from the OptionsFactory are applied on top of
these predefined ones.</td>
+ <td>The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community. Current supported candidate
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user
customized options and options from the RocksDBOptionsFactory are applied on
top of these predefined ones.</td>
</tr>
</tbody>
</table>
diff --git a/docs/_includes/generated/rocks_db_configuration.html
b/docs/_includes/generated/rocks_db_configuration.html
index ee45dc2..44ccc96 100644
--- a/docs/_includes/generated/rocks_db_configuration.html
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -54,7 +54,7 @@
<td><h5>state.backend.rocksdb.predefined-options</h5></td>
<td style="word-wrap: break-word;">"DEFAULT"</td>
<td>String</td>
- <td>The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community. Current supported candidate
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user
customized options and options from the OptionsFactory are applied on top of
these predefined ones.</td>
+ <td>The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community. Current supported candidate
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user
customized options and options from the RocksDBOptionsFactory are applied on
top of these predefined ones.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index c13c3d6..d989d29 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -243,7 +243,7 @@ For advanced tuning, Flink also provides two parameters to
control the division
Moreover, the L0 level filter and index are pinned into the cache by default
to mitigate performance problems,
more details please refer to the
[RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
-<span class="label label-info">Note</span> When the above described mechanism
(`cache` and `write buffer manager`) is enabled, it will override any
customized settings for block caches and write buffers done via
[`PredefinedOptions`](#predefined-per-columnfamily-options) and
[`OptionsFactory`](#passing-options-factory-to-rocksdb).
+<span class="label label-info">Note</span> When the above described mechanism
(`cache` and `write buffer manager`) is enabled, it will override any
customized settings for block caches and write buffers done via
[`PredefinedOptions`](#predefined-per-columnfamily-options) and
[`RocksDBOptionsFactory`](#passing-options-factory-to-rocksdb).
<span class="label label-info">Note</span> *Expert Mode*: To control memory
manually, you can set `state.backend.rocksdb.memory.managed` to `false` and
configure RocksDB via
[`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb). Alternatively,
you can use the above mentioned cache/buffer-manager mechanism, but set the
memory size to a fixed amount independent of Flink's managed memory size
(`state.backend.rocksdb.memory.fixed-per-slot` option). Note that in both
cases, users need [...]
@@ -286,7 +286,7 @@ The default value for this option is `DEFAULT` which
translates to `PredefinedOp
To manually control RocksDB's options, you need to configure an
`RocksDBOptionsFactory`. This mechanism gives you fine-grained control over the
settings of the Column Families, for example memory use, thread, compaction
settings, etc. There is currently one Column Family per each state in each
operator.
-There are two ways to pass an OptionsFactory to the RocksDB State Backend:
+There are two ways to pass a RocksDBOptionsFactory to the RocksDB State
Backend:
- Configure options factory class name in the `flink-conf.yaml` via
`state.backend.rocksdb.options-factory`.
@@ -302,7 +302,7 @@ allocating more memory than configured.
**Reading Column Family Options from flink-conf.yaml**
-When an `OptionsFactory` implements the `ConfigurableRocksDBOptionsFactory`
interface, it can directly read settings from the configuration
(`flink-conf.yaml`).
+When a `RocksDBOptionsFactory` implements the
`ConfigurableRocksDBOptionsFactory` interface, it can directly read settings
from the configuration (`flink-conf.yaml`).
The default value for `state.backend.rocksdb.options-factory` is in fact
`org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`
which picks up all config options [defined here]({{ site.baseurl
}}/ops/config.html#advanced-rocksdb-state-backends-options) by default. Hence,
you can configure low-level Column Family options simply by turning off managed
memory for RocksDB and putting the relevant entries in the configuration.
@@ -331,7 +331,7 @@ public class MyOptionsFactory implements
ConfigurableRocksDBOptionsFactory {
}
@Override
- public OptionsFactory configure(Configuration configuration) {
+ public RocksDBOptionsFactory configure(Configuration configuration) {
this.blockCacheSize =
configuration.getLong("my.custom.rocksdb.block.cache.size",
DEFAULT_SIZE);
return this;
diff --git a/docs/ops/state/state_backends.zh.md
b/docs/ops/state/state_backends.zh.md
index 59b5455..19bbf10 100644
--- a/docs/ops/state/state_backends.zh.md
+++ b/docs/ops/state/state_backends.zh.md
@@ -231,7 +231,7 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
- `state.backend.rocksdb.memory.high-prio-pool-ratio`,默认值 `0.1`,即 10% 的
block cache 内存会优先分配给索引及过滤器。
我们强烈建议不要将此值设置为零,以防止索引和过滤器被频繁踢出缓存而导致性能问题。此外,我们默认将L0级的过滤器和索引将被固定到缓存中以提高性能,更多详细信息请参阅
[RocksDB
文档](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks)。
-<span class="label label-info">注意</span> 上述机制开启时将覆盖用户在
[`PredefinedOptions`](#predefined-per-columnfamily-options) 和
[`OptionsFactory`](#passing-options-factory-to-rocksdb) 中对 block cache 和 write
buffer 进行的配置。
+<span class="label label-info">注意</span> 上述机制开启时将覆盖用户在
[`PredefinedOptions`](#predefined-per-columnfamily-options) 和
[`RocksDBOptionsFactory`](#passing-options-factory-to-rocksdb) 中对 block cache 和
write buffer 进行的配置。
<span class="label label-info">注意</span> *仅面向专业用户*:若要手动控制内存,可以将
`state.backend.rocksdb.memory.managed` 设置为 `false`,并通过
[`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb) 配置 RocksDB。
或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过
`state.backend.rocksdb.memory.fixed-per-slot` 选项)。
@@ -318,7 +318,7 @@ public class MyOptionsFactory implements
ConfigurableRocksDBOptionsFactory {
}
@Override
- public OptionsFactory configure(Configuration configuration) {
+ public RocksDBOptionsFactory configure(Configuration configuration) {
this.blockCacheSize =
configuration.getLong("my.custom.rocksdb.block.cache.size",
DEFAULT_SIZE);
return this;
diff --git a/flink-python/pyflink/datastream/state_backend.py
b/flink-python/pyflink/datastream/state_backend.py
index d975195..a6bd873 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -637,11 +637,11 @@ class RocksDBStateBackend(StateBackend):
The options factory must have a
default constructor.
"""
gateway = get_gateway()
- JOptionsFactory =
gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
+ JOptionsFactory =
gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
j_options_factory_clz = load_java_class(options_factory_class_name)
if not
get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
- raise ValueError("The input class not implements OptionsFactory.")
-
self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
+ raise ValueError("The input class does not implement
RocksDBOptionsFactory.")
+
self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance())
def get_options(self):
"""
@@ -650,7 +650,7 @@ class RocksDBStateBackend(StateBackend):
:return: The fully-qualified class name of the options factory in Java.
"""
- j_options_factory = self._j_rocks_db_state_backend.getOptions()
+ j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions()
if j_options_factory is not None:
return j_options_factory.getClass().getName()
else:
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
deleted file mode 100644
index 9256e59..0000000
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ReadableConfig;
-
-/**
- * @deprecated Replaced by {@link ConfigurableRocksDBOptionsFactory}.
- */
-public interface ConfigurableOptionsFactory extends OptionsFactory {
-
- /**
- * Creates a variant of the options factory that applies additional
configuration parameters.
- *
- * <p>If no configuration is applied, or if the method directly applies
configuration values to
- * the (mutable) options factory object, this method may return the
original options factory object.
- * Otherwise it typically returns a modified copy.
- *
- * @param configuration The configuration to pick the values from.
- * @return A reconfigured options factory.
- */
- OptionsFactory configure(ReadableConfig configuration);
-}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 88c8fad..bcfd87c 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -336,10 +336,10 @@ public class DefaultConfigurableOptionsFactory implements
ConfigurableRocksDBOpt
* Creates a {@link DefaultConfigurableOptionsFactory} instance from a
{@link ReadableConfig}.
*
* <p>If no options within {@link RocksDBConfigurableOptions} has ever
been configured,
- * the created OptionsFactory would not override anything defined in
{@link PredefinedOptions}.
+ * the created RocksDBOptionsFactory would not override anything
defined in {@link PredefinedOptions}.
*
- * @param configuration Configuration to be used for the
ConfigurableOptionsFactory creation
- * @return A ConfigurableOptionsFactory created from the given
configuration
+ * @param configuration Configuration to be used for the
ConfigurableRocksDBOptionsFactory creation
+ * @return A ConfigurableRocksDBOptionsFactory created from the given
configuration
*/
@Override
public DefaultConfigurableOptionsFactory configure(ReadableConfig
configuration) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
deleted file mode 100644
index 9118351..0000000
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-/**
- * @deprecated Use {@link RocksDBOptionsFactory} instead. This factory has no
mechanism to register
- * native handles to be closed and is thus deprecated in favor or
a new variant.
- */
-@Deprecated
-public interface OptionsFactory extends java.io.Serializable {
-
- /**
- * This method should set the additional options on top of the current
options object.
- * The current options object may contain pre-defined options based on
flags that have
- * been configured on the state backend.
- *
- * <p>It is important to set the options on the current object and
return the result from
- * the setter methods, otherwise the pre-defined options may get lost.
- *
- * @param currentOptions The options object with the pre-defined
options.
- * @return The options object on which the additional options are set.
- */
- DBOptions createDBOptions(DBOptions currentOptions);
-
- /**
- * This method should set the additional options on top of the current
options object.
- * The current options object may contain pre-defined options based on
flags that have
- * been configured on the state backend.
- *
- * <p>It is important to set the options on the current object and
return the result from
- * the setter methods, otherwise the pre-defined options may get lost.
- *
- * @param currentOptions The options object with the pre-defined
options.
- * @return The options object on which the additional options are set.
- */
- ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions);
-
- /**
- * This method should enable certain RocksDB metrics to be forwarded to
- * Flink's metrics reporter.
- *
- * <p>Enabling these monitoring options may degrade RockDB performance
- * and should be set with care.
- * @param nativeMetricOptions The options object with the pre-defined
options.
- * @return The options object on which the additional options are set.
- */
- default RocksDBNativeMetricOptions
createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
- return nativeMetricOptions;
- }
-}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 457ca33..61bf85a 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -74,7 +74,7 @@ public class RocksDBOptions {
.defaultValue(DEFAULT.name())
.withDescription(String.format("The predefined settings for
RocksDB DBOptions and ColumnFamilyOptions by Flink community. " +
"Current supported candidate predefined-options are %s,
%s, %s or %s. Note that user customized options and options " +
- "from the OptionsFactory are applied on top of these
predefined ones.",
+ "from the RocksDBOptionsFactory are applied on top of
these predefined ones.",
DEFAULT.name(), SPINNING_DISK_OPTIMIZED.name(),
SPINNING_DISK_OPTIMIZED_HIGH_MEM.name(), FLASH_SSD_OPTIMIZED.name()));
/**
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..7e86d22 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import java.util.ArrayList;
import java.util.Collection;
/**
@@ -29,10 +28,10 @@ import java.util.Collection;
* Options have to be created lazily by this factory, because the {@code
Options}
* class is not serializable and holds pointers to native code.
*
- * <p>A typical pattern to use this OptionsFactory is as follows:
+ * <p>A typical pattern to use this RocksDBOptionsFactory is as follows:
*
* <pre>{@code
- * rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
+ * rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
*
* public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
* return currentOptions.setMaxOpenFiles(1024);
@@ -49,8 +48,7 @@ import java.util.Collection;
* });
* }</pre>
*/
-@SuppressWarnings("deprecation")
-public interface RocksDBOptionsFactory extends OptionsFactory,
java.io.Serializable {
+public interface RocksDBOptionsFactory extends java.io.Serializable {
/**
* This method should set the additional options on top of the current
options object.
@@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends
OptionsFactory, java.io.Serializa
default RocksDBNativeMetricOptions
createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}
-
- //
------------------------------------------------------------------------
- // for compatibility
- //
------------------------------------------------------------------------
-
- /**
- * Do not override these methods, they are only to maintain interface
compatibility with
- * prior versions. They will be removed in one of the next versions.
- */
- @Override
- default DBOptions createDBOptions(DBOptions currentOptions) {
- return createDBOptions(currentOptions, new ArrayList<>());
- }
-
- /**
- * Do not override these methods, they are only to maintain interface
compatibility with
- * prior versions. They will be removed in one of the next versions.
- */
- @Override
- default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
- return createColumnOptions(currentOptions, new ArrayList<>());
- }
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
deleted file mode 100644
index 862832f..0000000
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ReadableConfig;
-
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-
-/**
- * A conversion from {@link OptionsFactory} to {@link RocksDBOptionsFactory}.
- */
-@SuppressWarnings("deprecation")
-final class RocksDBOptionsFactoryAdapter implements
ConfigurableRocksDBOptionsFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final OptionsFactory optionsFactory;
-
- RocksDBOptionsFactoryAdapter(OptionsFactory optionsFactory) {
- this.optionsFactory = optionsFactory;
- }
-
- @Override
- public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
- return optionsFactory.createDBOptions(currentOptions);
- }
-
- @Override
- public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions, Collection<AutoCloseable> handlesToClose) {
- return optionsFactory.createColumnOptions(currentOptions);
- }
-
- @Override
- public RocksDBNativeMetricOptions
createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
- return
optionsFactory.createNativeMetricsOptions(nativeMetricOptions);
- }
-
- @Override
- public RocksDBOptionsFactory configure(ReadableConfig configuration) {
- if (optionsFactory instanceof ConfigurableOptionsFactory) {
- final OptionsFactory reconfigured =
((ConfigurableOptionsFactory) optionsFactory).configure(configuration);
- return reconfigured == optionsFactory ? this : new
RocksDBOptionsFactoryAdapter(reconfigured);
- }
-
- return this;
- }
-
- @Nullable
- public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory
factory) {
- return factory instanceof RocksDBOptionsFactoryAdapter
- ? ((RocksDBOptionsFactoryAdapter)
factory).optionsFactory
- : factory;
- }
-}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4de126e..71a877a 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -804,30 +804,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
}
/**
- * The options factory supplied here was prone to resource leaks,
because it did not have a way
- * to register native handles / objects that need to be disposed when
the state backend is closed.
- *
- * @deprecated Use {@link #setRocksDBOptions(RocksDBOptionsFactory)}
instead.
- */
- @Deprecated
- public void setOptions(OptionsFactory optionsFactory) {
- this.rocksDbOptionsFactory = optionsFactory instanceof
RocksDBOptionsFactory
- ? (RocksDBOptionsFactory) optionsFactory
- : new
RocksDBOptionsFactoryAdapter(optionsFactory);
- }
-
- /**
- * The options factory supplied here was prone to resource leaks,
because it did not have a way
- * to register native handles / objects that need to be disposed when
the state backend is closed.
- *
- * @deprecated Use {@link #setRocksDBOptions(RocksDBOptionsFactory)}
and {@link #getRocksDBOptions()} instead.
- */
- @Deprecated
- public OptionsFactory getOptions() {
- return
RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
- }
-
- /**
* Gets the number of threads used to transfer files while
snapshotting/restoring.
*/
public int getNumberOfTransferThreads() {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
deleted file mode 100644
index 8c77ac2..0000000
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
-
-import org.junit.Test;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests that the changes introducing the {@link RocksDBOptionsFactory} are
backwards compatible.
- */
-public class RocksDBOptionsFactoryCompatibilityTest {
-
- @Test
- public void testInheritance() {
- assertThat(new DefaultConfigurableOptionsFactory(),
instanceOf(RocksDBOptionsFactory.class));
- }
-
- @Test
- public void testSetAndGet() throws Exception {
- final RocksDBStateBackend backend = new
RocksDBStateBackend("file:///a/b/c");
- final OptionsFactory testFactory = new TestOptionsFactory();
-
- backend.setOptions(testFactory);
-
- assertSame(testFactory, backend.getOptions());
- }
-
- @Test
- public void testConfiguration() throws Exception {
- final RocksDBStateBackend backend = new
RocksDBStateBackend("file:///a/b/c");
- final OptionsFactory testFactory = new TestOptionsFactory();
-
- backend.setOptions(testFactory);
-
- final TestOptionsFactory reconfigured = (TestOptionsFactory)
backend
- .configure(new Configuration(),
getClass().getClassLoader())
- .getOptions();
-
- assertTrue(reconfigured.wasConfigured);
- }
-
- //
------------------------------------------------------------------------
-
- private static class TestOptionsFactory implements
ConfigurableOptionsFactory {
-
- boolean wasConfigured;
-
- @Override
- public OptionsFactory configure(ReadableConfig configuration) {
- wasConfigured = true;
- return this;
- }
-
- @Override
- public DBOptions createDBOptions(DBOptions currentOptions) {
- return currentOptions;
- }
-
- @Override
- public ColumnFamilyOptions
createColumnOptions(ColumnFamilyOptions currentOptions) {
- return currentOptions;
- }
- }
-}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d3eb6db..0cc84db 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -552,7 +552,6 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend = rocksDbBackend.configure(config,
getClass().getClassLoader());
assertTrue(rocksDbBackend.getRocksDBOptions() instanceof
TestOptionsFactory);
- assertTrue(rocksDbBackend.getOptions() instanceof
TestOptionsFactory);
try (RocksDBResourceContainer optionsContainer =
rocksDbBackend.createOptionsAndResourceContainer()) {
DBOptions dbOptions = optionsContainer.getDbOptions();
@@ -640,7 +639,7 @@ public class RocksDBStateBackendConfigTest {
assertEquals(original.isIncrementalCheckpointsEnabled(),
copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(),
copy.getDbStoragePaths());
- assertEquals(original.getOptions(), copy.getOptions());
+ assertEquals(original.getRocksDBOptions(),
copy.getRocksDBOptions());
assertEquals(original.getPredefinedOptions(),
copy.getPredefinedOptions());
FsStateBackend copyCheckpointBackend = (FsStateBackend)
copy.getCheckpointBackend();