This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd91affd8040123e2f757b24859b7dad33e09532
Author: Yu Li <[email protected]>
AuthorDate: Thu Jun 11 19:32:39 2020 +0800

    [FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from 
OptionsFactory
    
    This closes #12673.
---
 flink-python/pyflink/datastream/state_backend.py   |  8 ++--
 .../streaming/state/OptionsFactoryAdapter.java     | 55 ++++++++++++++++++++++
 .../streaming/state/RocksDBOptionsFactory.java     | 28 +----------
 .../state/RocksDBOptionsFactoryAdapter.java        |  4 +-
 .../streaming/state/RocksDBStateBackend.java       | 10 ++--
 .../state/RocksDBStateBackendConfigTest.java       | 11 ++++-
 6 files changed, 78 insertions(+), 38 deletions(-)

diff --git a/flink-python/pyflink/datastream/state_backend.py 
b/flink-python/pyflink/datastream/state_backend.py
index d894137..46ccf66 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -682,11 +682,11 @@ class RocksDBStateBackend(StateBackend):
                                            The options factory must have a 
default constructor.
         """
         gateway = get_gateway()
-        JOptionsFactory = 
gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
+        JOptionsFactory = 
gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
         j_options_factory_clz = load_java_class(options_factory_class_name)
         if not 
get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
-            raise ValueError("The input class not implements OptionsFactory.")
-        
self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
+            raise ValueError("The input class not implements 
RocksDBOptionsFactory.")
+        
self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance())
 
     def get_options(self):
         """
@@ -695,7 +695,7 @@ class RocksDBStateBackend(StateBackend):
 
         :return: The fully-qualified class name of the options factory in Java.
         """
-        j_options_factory = self._j_rocks_db_state_backend.getOptions()
+        j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions()
         if j_options_factory is not None:
             return j_options_factory.getClass().getName()
         else:
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
new file mode 100644
index 0000000..666bc4b
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.util.ArrayList;
+
+/**
+ * A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}.
+ */
+public class OptionsFactoryAdapter implements OptionsFactory {
+
+       private static final long serialVersionUID = 1L;
+
+       private final RocksDBOptionsFactory rocksDBOptionsFactory;
+
+       OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) {
+               this.rocksDBOptionsFactory = rocksDBOptionsFactory;
+       }
+
+       @Override
+       public DBOptions createDBOptions(DBOptions currentOptions) {
+               return rocksDBOptionsFactory.createDBOptions(currentOptions, 
new ArrayList<>());
+       }
+
+       @Override
+       public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
+               return 
rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>());
+       }
+
+       @VisibleForTesting
+       RocksDBOptionsFactory getRocksDBOptionsFactory() {
+               return rocksDBOptionsFactory;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..dffa9e9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -32,7 +31,7 @@ import java.util.Collection;
  * <p>A typical pattern to use this OptionsFactory is as follows:
  *
  * <pre>{@code
- * rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
+ * rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
  *
  *             public DBOptions createDBOptions(DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
  *                     return currentOptions.setMaxOpenFiles(1024);
@@ -49,8 +48,7 @@ import java.util.Collection;
  * });
  * }</pre>
  */
-@SuppressWarnings("deprecation")
-public interface RocksDBOptionsFactory extends OptionsFactory, 
java.io.Serializable {
+public interface RocksDBOptionsFactory extends java.io.Serializable {
 
        /**
         * This method should set the additional options on top of the current 
options object.
@@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends 
OptionsFactory, java.io.Serializa
        default RocksDBNativeMetricOptions 
createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
                return nativeMetricOptions;
        }
-
-       // 
------------------------------------------------------------------------
-       //  for compatibility
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Do not override these methods, they are only to maintain interface 
compatibility with
-        * prior versions. They will be removed in one of the next versions.
-        */
-       @Override
-       default DBOptions createDBOptions(DBOptions currentOptions) {
-               return createDBOptions(currentOptions, new ArrayList<>());
-       }
-
-       /**
-        * Do not override these methods, they are only to maintain interface 
compatibility with
-        * prior versions. They will be removed in one of the next versions.
-        */
-       @Override
-       default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
-               return createColumnOptions(currentOptions, new ArrayList<>());
-       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
index 36e8c3f..7efc32d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
@@ -67,9 +67,9 @@ final class RocksDBOptionsFactoryAdapter implements 
ConfigurableRocksDBOptionsFa
        }
 
        @Nullable
-       public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory 
factory) {
+       static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
                return factory instanceof RocksDBOptionsFactoryAdapter
                                ? ((RocksDBOptionsFactoryAdapter) 
factory).optionsFactory
-                               : factory;
+                               : new OptionsFactoryAdapter(factory);
        }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 873f57d..e9d27b6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -861,9 +861,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        @Deprecated
        public void setOptions(OptionsFactory optionsFactory) {
-               this.rocksDbOptionsFactory = optionsFactory instanceof 
RocksDBOptionsFactory
-                               ? (RocksDBOptionsFactory) optionsFactory
-                               : new 
RocksDBOptionsFactoryAdapter(optionsFactory);
+               this.rocksDbOptionsFactory = new 
RocksDBOptionsFactoryAdapter(optionsFactory);
        }
 
        /**
@@ -874,7 +872,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        @Deprecated
        public OptionsFactory getOptions() {
-               return 
RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+               if (rocksDbOptionsFactory == null) {
+                       return null;
+               } else {
+                       return 
RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+               }
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2167a00..1f691bf 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -552,7 +552,14 @@ public class RocksDBStateBackendConfigTest {
                rocksDbBackend = rocksDbBackend.configure(config, 
getClass().getClassLoader());
 
                assertTrue(rocksDbBackend.getRocksDBOptions() instanceof 
TestOptionsFactory);
-               assertTrue(rocksDbBackend.getOptions() instanceof 
TestOptionsFactory);
+               OptionsFactory optionsFactory = rocksDbBackend.getOptions();
+               if (optionsFactory instanceof OptionsFactoryAdapter) {
+                       RocksDBOptionsFactory rocksDBOptionsFactory =
+                               ((OptionsFactoryAdapter) 
optionsFactory).getRocksDBOptionsFactory();
+                       assertTrue(rocksDBOptionsFactory instanceof 
TestOptionsFactory);
+               } else {
+                       assertTrue(optionsFactory instanceof 
TestOptionsFactory);
+               }
 
                try (RocksDBResourceContainer optionsContainer = 
rocksDbBackend.createOptionsAndResourceContainer()) {
                        DBOptions dbOptions = optionsContainer.getDbOptions();
@@ -640,7 +647,7 @@ public class RocksDBStateBackendConfigTest {
 
                assertEquals(original.isIncrementalCheckpointsEnabled(), 
copy.isIncrementalCheckpointsEnabled());
                assertArrayEquals(original.getDbStoragePaths(), 
copy.getDbStoragePaths());
-               assertEquals(original.getOptions(), copy.getOptions());
+               assertEquals(original.getRocksDBOptions(), 
copy.getRocksDBOptions());
                assertEquals(original.getPredefinedOptions(), 
copy.getPredefinedOptions());
 
                FsStateBackend copyCheckpointBackend = (FsStateBackend) 
copy.getCheckpointBackend();

Reply via email to