[FLINK-5310] [RocksDB] Harden the JNI library loading
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/609c046d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/609c046d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/609c046d Branch: refs/heads/master Commit: 609c046dba20cd07d9480715cfd1a6d78ed3a611 Parents: a078666 Author: Stephan Ewen <[email protected]> Authored: Fri Dec 9 17:47:25 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 12 18:35:40 2016 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 2 - .../streaming/state/RocksDBStateBackend.java | 75 ++++++++++++++++++++ .../streaming/state/RocksDBInitResetTest.java | 32 +++++++++ .../state/RocksDBStateBackendConfigTest.java | 14 +++- 4 files changed, 119 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4db622d..8637f6b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -149,8 +149,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.instanceBasePath = instanceBasePath; this.instanceRocksDBPath = new File(instanceBasePath, "db"); - RocksDB.loadLibrary(); - if (!instanceBasePath.exists()) { if (!instanceBasePath.mkdirs()) { throw new RuntimeException("Could not create RocksDB data directory."); http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 9ba0dc1..2109cea 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -17,6 +17,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.StateBackend; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -33,11 +34,14 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -66,6 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); + private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; + + private static boolean rocksDbInitialized = false; + // ------------------------------------------------------------------------ // Static configuration values // ------------------------------------------------------------------------ @@ -229,6 +237,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { + // first, make sure that the RocksDB JNI library is loaded + // we do this explicitly here to have better error handling + String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; + ensureRocksDBIsLoaded(tempDir); + lazyInitializeForJob(env, operatorIdentifier); File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString()); @@ -257,6 +270,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { Collection<KeyGroupsStateHandle> restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception { + // first, make sure that the RocksDB JNI library is loaded + // we do this explicitly here to have better error handling + String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; + ensureRocksDBIsLoaded(tempDir); + lazyInitializeForJob(env, operatorIdentifier); File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString()); @@ -452,4 +470,61 @@ public class RocksDBStateBackend extends AbstractStateBackend { ", checkpointStreamBackend=" + checkpointStreamBackend + '}'; } + + // ------------------------------------------------------------------------ + // static library loading utilities + // ------------------------------------------------------------------------ + + private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception { + // lock on something that cannot be in the user JAR + synchronized (org.apache.flink.runtime.taskmanager.Task.class) { + if (!rocksDbInitialized) { + + final File tempDirFile = new File(tempDirectory); + final String path = tempDirFile.getAbsolutePath(); + + LOG.info("Attempting to load RocksDB native library and store it at '{}'", path); + + Throwable lastException = null; + for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) { + try { + // make sure the temp path exists + // noinspection ResultOfMethodCallIgnored + tempDirFile.mkdirs(); + + // explicitly load the JNI dependency if it has not been loaded before + NativeLibraryLoader.getInstance().loadLibrary(path); + + // this initialization here should validate that the loading succeeded + RocksDB.loadLibrary(); + + // seems to have worked + LOG.info("Successfully loaded RocksDB native library"); + rocksDbInitialized = true; + return; + } + catch (Throwable t) { + lastException = t; + LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t); + + // try to force RocksDB to attempt reloading the library + try { + resetRocksDBLoadedFlag(); + } catch (Throwable tt) { + LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt); + } + } + } + + throw new Exception("Could not load the native RocksDB library", lastException); + } + } + } + + @VisibleForTesting + static void resetRocksDBLoadedFlag() throws Exception { + final Field initField = org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized"); + initField.setAccessible(true); + initField.setBoolean(null, false); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java new file mode 100644 index 0000000..7343b56 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.junit.Test; + +/** + * This test checks that the RocksDB native code loader still responds to resetting the + */ +public class RocksDBInitResetTest { + + @Test + public void testResetInitFlag() throws Exception { + RocksDBStateBackend.resetRocksDBLoadedFlag(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index bf9b315..07fb48e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -22,12 +22,14 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.OperatingSystem; import org.junit.Assume; import org.junit.Before; @@ -95,7 +97,7 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath()); assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths()); - Environment env = getMockEnvironment(new File[] {}); + Environment env = getMockEnvironment(); RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. createKeyedStateBackend( env, @@ -360,6 +362,11 @@ public class RocksDBStateBackendConfigTest { } private static Environment getMockEnvironment(File[] tempDirs) { + final String[] tempDirStrings = new String[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + tempDirStrings[i] = tempDirs[i].getAbsolutePath(); + } + IOManager ioMan = mock(IOManager.class); when(ioMan.getSpillingDirectories()).thenReturn(tempDirs); @@ -371,8 +378,11 @@ public class RocksDBStateBackendConfigTest { TaskInfo taskInfo = mock(TaskInfo.class); when(env.getTaskInfo()).thenReturn(taskInfo); - when(taskInfo.getIndexOfThisSubtask()).thenReturn(0); + + TaskManagerRuntimeInfo tmInfo = new TaskManagerRuntimeInfo("localhost", new Configuration(), tempDirStrings); + when(env.getTaskManagerInfo()).thenReturn(tmInfo); + return env; } }
