[FLINK-5310] [RocksDB] Harden the JNI library loading

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/609c046d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/609c046d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/609c046d

Branch: refs/heads/master
Commit: 609c046dba20cd07d9480715cfd1a6d78ed3a611
Parents: a078666
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 9 17:47:25 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 12 18:35:40 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  2 -
 .../streaming/state/RocksDBStateBackend.java    | 75 ++++++++++++++++++++
 .../streaming/state/RocksDBInitResetTest.java   | 32 +++++++++
 .../state/RocksDBStateBackendConfigTest.java    | 14 +++-
 4 files changed, 119 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4db622d..8637f6b 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -149,8 +149,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.instanceBasePath = instanceBasePath;
                this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-               RocksDB.loadLibrary();
-
                if (!instanceBasePath.exists()) {
                        if (!instanceBasePath.mkdirs()) {
                                throw new RuntimeException("Could not create 
RocksDB data directory.");

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9ba0dc1..2109cea 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,11 +34,14 @@ import 
org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,6 +70,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
 
+       private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+
+       private static boolean rocksDbInitialized = false;
+
        // 
------------------------------------------------------------------------
        //  Static configuration values
        // 
------------------------------------------------------------------------
@@ -229,6 +237,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
 
+               // first, make sure that the RocksDB JNI library is loaded
+               // we do this explicitly here to have better error handling
+               String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
+               ensureRocksDBIsLoaded(tempDir);
+
                lazyInitializeForJob(env, operatorIdentifier);
 
                File instanceBasePath = new File(getDbPath(), 
UUID.randomUUID().toString());
@@ -257,6 +270,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        Collection<KeyGroupsStateHandle> restoredState,
                        TaskKvStateRegistry kvStateRegistry) throws Exception {
 
+               // first, make sure that the RocksDB JNI library is loaded
+               // we do this explicitly here to have better error handling
+               String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
+               ensureRocksDBIsLoaded(tempDir);
+
                lazyInitializeForJob(env, operatorIdentifier);
 
                File instanceBasePath = new File(getDbPath(), 
UUID.randomUUID().toString());
@@ -452,4 +470,61 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        ", checkpointStreamBackend=" + checkpointStreamBackend +
                        '}';
        }
+
+       // 
------------------------------------------------------------------------
+       //  static library loading utilities
+       // 
------------------------------------------------------------------------
+
+       private void ensureRocksDBIsLoaded(String tempDirectory) throws 
Exception {
+               // lock on something that cannot be in the user JAR
+               synchronized (org.apache.flink.runtime.taskmanager.Task.class) {
+                       if (!rocksDbInitialized) {
+
+                               final File tempDirFile = new 
File(tempDirectory);
+                               final String path = 
tempDirFile.getAbsolutePath();
+
+                               LOG.info("Attempting to load RocksDB native 
library and store it at '{}'", path);
+
+                               Throwable lastException = null;
+                               for (int attempt = 1; attempt <= 
ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
+                                       try {
+                                               // make sure the temp path 
exists
+                                               // noinspection 
ResultOfMethodCallIgnored
+                                               tempDirFile.mkdirs();
+
+                                               // explicitly load the JNI 
dependency if it has not been loaded before
+                                               
NativeLibraryLoader.getInstance().loadLibrary(path);
+
+                                               // this initialization here 
should validate that the loading succeeded
+                                               RocksDB.loadLibrary();
+
+                                               // seems to have worked
+                                               LOG.info("Successfully loaded 
RocksDB native library");
+                                               rocksDbInitialized = true;
+                                               return;
+                                       }
+                                       catch (Throwable t) {
+                                               lastException = t;
+                                               LOG.debug("RocksDB JNI library 
loading attempt {} failed", attempt, t);
+
+                                               // try to force RocksDB to 
attempt reloading the library
+                                               try {
+                                                       
resetRocksDBLoadedFlag();
+                                               } catch (Throwable tt) {
+                                                       LOG.debug("Failed to 
reset 'initialized' flag in RocksDB native code loader", tt);
+                                               }
+                                       }
+                               }
+
+                               throw new Exception("Could not load the native 
RocksDB library", lastException);
+                       }
+               }
+       }
+
+       @VisibleForTesting
+       static void resetRocksDBLoadedFlag() throws Exception {
+               final Field initField = 
org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
+               initField.setAccessible(true);
+               initField.setBoolean(null, false);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
new file mode 100644
index 0000000..7343b56
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.junit.Test;
+
+/**
+ * This test checks that the RocksDB native code loader still responds to 
resetting the
+ */
+public class RocksDBInitResetTest {
+
+       @Test
+       public void testResetInitFlag() throws Exception {
+               RocksDBStateBackend.resetRocksDBLoadedFlag();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bf9b315..07fb48e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -22,12 +22,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
 import org.junit.Before;
@@ -95,7 +97,7 @@ public class RocksDBStateBackendConfigTest {
                rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath());
                assertArrayEquals(new String[] { testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
 
-               Environment env = getMockEnvironment(new File[] {});
+               Environment env = getMockEnvironment();
                RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
                                createKeyedStateBackend(
                                                env,
@@ -360,6 +362,11 @@ public class RocksDBStateBackendConfigTest {
        }
 
        private static Environment getMockEnvironment(File[] tempDirs) {
+               final String[] tempDirStrings = new String[tempDirs.length];
+               for (int i = 0; i < tempDirs.length; i++) {
+                       tempDirStrings[i] = tempDirs[i].getAbsolutePath();
+               }
+
                IOManager ioMan = mock(IOManager.class);
                when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
 
@@ -371,8 +378,11 @@ public class RocksDBStateBackendConfigTest {
 
                TaskInfo taskInfo = mock(TaskInfo.class);
                when(env.getTaskInfo()).thenReturn(taskInfo);
-
                when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+               TaskManagerRuntimeInfo tmInfo = new 
TaskManagerRuntimeInfo("localhost", new Configuration(), tempDirStrings);
+               when(env.getTaskManagerInfo()).thenReturn(tmInfo);
+
                return env;
        }
 }

Reply via email to