Repository: bookkeeper Updated Branches: refs/heads/sijie/bookkeeper_fallocate [created] 95028b9d8
bookie: fallocate & sync_file_range - introduce fallocate & sync_file_range in NativeIO to provide better preallocation & file sync logic. - if journalAdaptiveGroupWrites is disabled, use sync_file_range to sync range in better granularity - add more stats on journal flush & creation. RB_ID=260795 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/d9802962 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/d9802962 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/d9802962 Branch: refs/heads/sijie/bookkeeper_fallocate Commit: d9802962b20acf7a86209262871af7a2eb4cbed7 Parents: 95ea481 Author: Sijie Guo <[email protected]> Authored: Mon Jan 27 23:03:07 2014 -0800 Committer: Sijie Guo <[email protected]> Committed: Thu Nov 17 17:17:21 2016 -0800 ---------------------------------------------------------------------- bookkeeper-server/bin/bookkeeper | 9 +- bookkeeper-server/pom.xml | 73 ++++++++ bookkeeper-server/src/CMakeLists.txt | 142 ++++++++++++++++ bookkeeper-server/src/JNIFlags.cmake | 118 +++++++++++++ bookkeeper-server/src/config.h.cmake | 26 +++ .../org/apache/bookkeeper/bookie/Journal.java | 165 ++++++++++++------- .../bookkeeper/bookie/JournalChannel.java | 137 ++++++++++++--- .../java/org/apache/bookkeeper/util/Errno.java | 115 +++++++++++++ .../org/apache/bookkeeper/util/NativeIO.java | 147 +++++++++++++++-- .../src/org/apache/bookkeeper/util/NativeIO.c | 122 ++++++++++++++ 10 files changed, 955 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/bin/bookkeeper ---------------------------------------------------------------------- diff --git a/bookkeeper-server/bin/bookkeeper b/bookkeeper-server/bin/bookkeeper index 54be3fe..87429f9 100755 --- a/bookkeeper-server/bin/bookkeeper +++ b/bookkeeper-server/bin/bookkeeper @@ -180,10 +180,11 @@ if [ -z "$BOOKIE_LOG_CONF" ]; then fi BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH" -BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH" -OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`" - -OPTS="-cp $BOOKIE_CLASSPATH $OPTS" +if [ "$BOOKIE_LOG_CONF" != "" ]; then + BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH" + OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`" +fi +OPTS="-cp $BOOKIE_CLASSPATH -Djava.library.path=$BK_HOME/target/native/target/usr/local/lib $OPTS" OPTS="$OPTS $BOOKIE_EXTRA_OPTS" http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/pom.xml ---------------------------------------------------------------------- diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index bd143f1..730a659 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -327,6 +327,79 @@ </build> <profiles> <profile> + <id>native</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>enforce-os</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireOS> + <family>mac</family> + <family>unix</family> + <message>native build only supported on Mac or Unix</message> + </requireOS> + </rules> + <fail>true</fail> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>native-maven-plugin</artifactId> + <executions> + <execution> + <phase>compile</phase> + <goals> + <goal>javah</goal> + </goals> + <configuration> + <javahPath>${env.JAVA_HOME}/bin/javah</javahPath> + <javahClassNames> + <javahClassName>org.apache.bookkeeper.util.NativeIO</javahClassName> + </javahClassNames> + <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>make</id> + <phase>compile</phase> + <goals><goal>run</goal></goals> + <configuration> + <target> + <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true"> + <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/> + </exec> + <exec executable="make" dir="${project.build.directory}/native" failonerror="true"> + <arg line="VERBOSE=1"/> + </exec> + <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> <id>protobuf</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/CMakeLists.txt b/bookkeeper-server/src/CMakeLists.txt new file mode 100644 index 0000000..3d446b8 --- /dev/null +++ b/bookkeeper-server/src/CMakeLists.txt @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cmake_minimum_required(VERSION 2.6 FATAL_ERROR) + +# Default to release builds +set(CMAKE_BUILD_TYPE, Release) + +include(JNIFlags.cmake NO_POLICY_SCOPE) + +# Compile a library with both shared and static variants +function(add_dual_library LIBNAME) + add_library(${LIBNAME} SHARED ${ARGN}) + add_library(${LIBNAME}_static STATIC ${ARGN}) + set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME}) +endfunction(add_dual_library) + +# Link both a static and a dynamic target against some libraries +function(target_link_dual_libraries LIBNAME) + target_link_libraries(${LIBNAME} ${ARGN}) + target_link_libraries(${LIBNAME}_static ${ARGN}) +endfunction(target_link_dual_libraries) + +function(output_directory TGT DIR) + SET_TARGET_PROPERTIES(${TGT} PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}") + SET_TARGET_PROPERTIES(${TGT} PROPERTIES + ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}") + SET_TARGET_PROPERTIES(${TGT} PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}") +endfunction(output_directory TGT DIR) + +function(dual_output_directory TGT DIR) + output_directory(${TGT} "${DIR}") + output_directory(${TGT}_static "${DIR}") +endfunction(dual_output_directory TGT DIR) + +# +# This macro alters the behavior of find_package and find_library. +# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable. +# You should save that variable before calling this function and restore it +# after you have accomplished your goal. +# +# The behavior is altered in two ways: +# 1. We always find shared libraries, never static; +# 2. We find shared libraries with the given version number. +# +# On Windows this function is a no-op. Windows does not encode +# version number information information into library path names. +# +macro(set_find_shared_library_version LVERS) + IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + # Mac OS uses .dylib + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib") + ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + # FreeBSD has always .so installed. + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so") + ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + # Windows doesn't support finding shared libraries by version. + ELSE() + # Most UNIX variants use .so + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}") + ENDIF() +endmacro(set_find_shared_library_version LVERS) + +if (NOT GENERATED_JAVAH) + # Must identify where the generated headers have been placed + MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH") +endif (NOT GENERATED_JAVAH) +find_package(JNI REQUIRED) + +SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES) +set_find_shared_library_version("1") +find_package(ZLIB REQUIRED) +SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES) + +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64") +set(D main/native/src/org/apache/bookkeeper) +set(T main/native/src/test/org/apache/bookkeeper) + +INCLUDE(CheckFunctionExists) +INCLUDE(CheckCSourceCompiles) +INCLUDE(CheckLibraryExists) +CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE) +CHECK_FUNCTION_EXISTS(fallocate HAVE_FALLOCATE) +CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE) +CHECK_FUNCTION_EXISTS(posix_fallocate HAVE_POSIX_ALLOCATE) +CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL) + +include_directories( + ${GENERATED_JAVAH} + main/native/src + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/src + ${CMAKE_BINARY_DIR} + ${JNI_INCLUDE_DIRS} +) +CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h) + +SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) +add_dual_library(bookkeeper + ${D}/util/NativeIO.c +) +if (NEED_LINK_DL) + set(LIB_DL dl) +endif (NEED_LINK_DL) + +IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") + # + # By embedding '$ORIGIN' into the RPATH of libbookkeeper.so, + # dlopen will look in the directory containing libbookkeeper.so. + # However, $ORIGIN is not supported by all operating systems. + # + SET_TARGET_PROPERTIES(bookkeeper + PROPERTIES INSTALL_RPATH "\$ORIGIN/") +ENDIF() + +target_link_dual_libraries(bookkeeper + ${LIB_DL} + ${JAVA_JVM_LIBRARY} +) +SET(LIBBOOKKEEPER_VERSION "1.0.0") +SET_TARGET_PROPERTIES(bookkeeper PROPERTIES + SOVERSION ${LIBBOOKKEEPER_VERSION}) +dual_output_directory(bookkeeper target/usr/local/lib) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/JNIFlags.cmake ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/JNIFlags.cmake b/bookkeeper-server/src/JNIFlags.cmake new file mode 100644 index 0000000..8333285 --- /dev/null +++ b/bookkeeper-server/src/JNIFlags.cmake @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cmake_minimum_required(VERSION 2.6 FATAL_ERROR) + +# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit. +# This variable is set by maven. +if (JVM_ARCH_DATA_MODEL EQUAL 32) + # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64 + if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32") + set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32") + endif () + if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") + # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use + # the 32-bit version of libjvm.so. + set(CMAKE_SYSTEM_PROCESSOR "i686") + endif () +endif (JVM_ARCH_DATA_MODEL EQUAL 32) + +# Determine float ABI of JVM on ARM Linux +if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux") + find_program(READELF readelf) + if (READELF MATCHES "NOTFOUND") + message(WARNING "readelf not found; JVM float ABI detection disabled") + else (READELF MATCHES "NOTFOUND") + execute_process( + COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY} + OUTPUT_VARIABLE JVM_ELF_ARCH + ERROR_QUIET) + if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers") + message("Soft-float JVM detected") + + # Test compilation with -mfloat-abi=softfp using an arbitrary libc function + # (typically fails with "fatal error: bits/predefs.h: No such file or directory" + # if soft-float dev libraries are not installed) + include(CMakePushCheckState) + cmake_push_check_state() + set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp") + include(CheckSymbolExists) + check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE) + if (NOT SOFTFP_AVAILABLE) + message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)") + endif (NOT SOFTFP_AVAILABLE) + cmake_pop_check_state() + + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp") + endif () + endif (READELF MATCHES "NOTFOUND") +endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux") + +IF("${CMAKE_SYSTEM}" MATCHES "Linux") + # + # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES. + # Since we were invoked from Maven, we know that the JAVA_HOME environment + # variable is valid. So we ignore system paths here and just use JAVA_HOME. + # + FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME) + IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$") + SET(_java_libarch "i386") + ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") + SET(_java_libarch "amd64") + ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm") + SET(_java_libarch "arm") + ELSE() + SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR}) + ENDIF() + SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*" + "${_JAVA_HOME}/jre/lib/${_java_libarch}" + "${_JAVA_HOME}/jre/lib/*" + "${_JAVA_HOME}/jre/lib" + "${_JAVA_HOME}/lib/*" + "${_JAVA_HOME}/lib" + "${_JAVA_HOME}/include/*" + "${_JAVA_HOME}/include" + "${_JAVA_HOME}" + ) + FIND_PATH(JAVA_INCLUDE_PATH + NAMES jni.h + PATHS ${_JDK_DIRS} + NO_DEFAULT_PATH) + #In IBM java, it's jniport.h instead of jni_md.h + FIND_PATH(JAVA_INCLUDE_PATH2 + NAMES jni_md.h jniport.h + PATHS ${_JDK_DIRS} + NO_DEFAULT_PATH) + SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2}) + FIND_LIBRARY(JAVA_JVM_LIBRARY + NAMES jvm JavaVM + PATHS ${_JDK_DIRS} + NO_DEFAULT_PATH) + SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY}) + MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}") + MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}") + IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2) + MESSAGE("Located all JNI components successfully.") + ELSE() + MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.") + ENDIF() +ELSE() + find_package(JNI REQUIRED) +ENDIF() http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/config.h.cmake ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/config.h.cmake b/bookkeeper-server/src/config.h.cmake new file mode 100644 index 0000000..d460b7f --- /dev/null +++ b/bookkeeper-server/src/config.h.cmake @@ -0,0 +1,26 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef CONFIG_H +#define CONFIG_H + +#cmakedefine HAVE_SYNC_FILE_RANGE +#cmakedefine HAVE_FALLOCATE +#cmakedefine HAVE_POSIX_FADVISE +#cmakedefine HAVE_POSIX_FALLOCATE + +#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 7be0984..dd62d28 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -39,10 +39,15 @@ import com.google.common.base.Stopwatch; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +<<<<<<< HEAD import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +======= +import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger; +import org.apache.bookkeeper.stats.ServerStatsProvider; +>>>>>>> 2d5718f... bookie: fallocate & sync_file_range import org.apache.bookkeeper.util.DaemonThreadFactory; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; @@ -271,13 +276,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource { /** * Journal Entry to Record */ - private class QueueEntry implements Runnable { - ByteBuffer entry; - long ledgerId; - long entryId; - WriteCallback cb; - Object ctx; - long enqueueTime; + private class QueueEntry extends SafeRunnable { + final ByteBuffer entry; + final long ledgerId; + final long entryId; + final WriteCallback cb; + final Object ctx; + final long enqueueTime; QueueEntry(ByteBuffer entry, long ledgerId, long entryId, WriteCallback cb, Object ctx, long enqueueTime) { @@ -304,19 +309,22 @@ class Journal extends BookieCriticalThread implements CheckpointSource { private final LinkedList<QueueEntry> forceWriteWaiters; private boolean shouldClose; private final boolean isMarker; - private final long lastFlushedPosition; + private final long startFlushPosition; + private final long endFlushPosition; private final long logId; private ForceWriteRequest(JournalChannel logFile, long logId, - long lastFlushedPosition, + long startFlushPosition, + long endFlushPosition, LinkedList<QueueEntry> forceWriteWaiters, boolean shouldClose, boolean isMarker) { this.forceWriteWaiters = forceWriteWaiters; this.logFile = logFile; this.logId = logId; - this.lastFlushedPosition = lastFlushedPosition; + this.startFlushPosition = startFlushPosition; + this.endFlushPosition = endFlushPosition; this.shouldClose = shouldClose; this.isMarker = isMarker; forceWriteQueueSize.inc(); @@ -324,22 +332,24 @@ class Journal extends BookieCriticalThread implements CheckpointSource { public int process(boolean shouldForceWrite) throws IOException { forceWriteQueueSize.dec(); + if (isMarker) { return 0; } try { if (shouldForceWrite) { - long startTime = MathUtils.nowInNano(); - this.logFile.forceWrite(false); - journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); + if (enableGroupForceWrites) { + this.logFile.forceWrite(false); + } else { + this.logFile.syncRangeOrForceWrite(this.startFlushPosition, + this.endFlushPosition - this.startFlushPosition); + } } - lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition); + lastLogMark.setCurLogMark(this.logId, this.endFlushPosition); // Notify the waiters that the force write succeeded - for (QueueEntry e : this.forceWriteWaiters) { - cbThreadPool.submit(e); - } + callback(); return this.forceWriteWaiters.size(); } @@ -348,6 +358,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource { } } + void callback() { + for (QueueEntry e : this.forceWriteWaiters) { + if (null != e.ctx) { + cbThreadPool.submitOrdered(e.ctx, e); + } else { + cbThreadPool.submit(e); + } + } + } + public void closeFileIfNecessary() { // Close if shouldClose is set if (shouldClose) { @@ -374,14 +394,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // This holds the queue entries that should be notified after a // successful force write Thread threadToNotifyOnEx; - // should we group force writes - private final boolean enableGroupForceWrites; // make flush interval as a parameter - public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) { + public ForceWriteThread(Thread threadToNotifyOnEx) { super("ForceWriteThread"); this.threadToNotifyOnEx = threadToNotifyOnEx; - this.enableGroupForceWrites = enableGroupForceWrites; } + @Override public void run() { LOG.info("ForceWrite Thread started"); @@ -394,25 +412,23 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // Force write the file and then notify the write completions // - if (!req.isMarker) { - if (shouldForceWrite) { - // if we are going to force write, any request that is already in the - // queue will benefit from this force write - post a marker prior to issuing - // the flush so until this marker is encountered we can skip the force write - if (enableGroupForceWrites) { - forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true)); - } + if (!req.isMarker && shouldForceWrite) { + // if we are going to force write, any request that is already in the + // queue will benefit from this force write - post a marker prior to issuing + // the flush so until this marker is encountered we can skip the force write + if (enableGroupForceWrites) { + forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, 0, null, false, true)); + } - // If we are about to issue a write, record the number of requests in - // the last force write and then reset the counter so we can accumulate - // requests in the write we are about to issue - if (numReqInLastForceWrite > 0) { - forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite); - numReqInLastForceWrite = 0; - } + // If we are about to issue a write, record the number of requests in + // the last force write and then reset the counter so we can accumulate + // requests in the write we are about to issue + if (numReqInLastForceWrite > 0) { + forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite); + numReqInLastForceWrite = 0; } - numReqInLastForceWrite += req.process(shouldForceWrite); } + numReqInLastForceWrite += req.process(shouldForceWrite); if (enableGroupForceWrites && // if its a marker we should switch back to flushing @@ -493,6 +509,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource { final File journalDirectory; final ServerConfiguration conf; final ForceWriteThread forceWriteThread; + // should we group force writes + private final boolean enableGroupForceWrites; // Time after which we will stop grouping and issue the flush private final long maxGroupWaitInNanos; // Threshold after which we flush any buffered journal entries @@ -503,13 +521,17 @@ class Journal extends BookieCriticalThread implements CheckpointSource { private final boolean flushWhenQueueEmpty; // should we hint the filesystem to remove pages from cache after force write private final boolean removePagesFromCache; + // journal align size + private final int journalAlignmentSize; + // journal format version to write + private final int journalFormatVersionToWrite; private final LastLogMark lastLogMark = new LastLogMark(0, 0); /** * The thread pool used to handle callback. */ - private final ExecutorService cbThreadPool; + private final OrderedSafeExecutor cbThreadPool; // journal entry queue to commit final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>(); @@ -548,12 +570,19 @@ class Journal extends BookieCriticalThread implements CheckpointSource { this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB; this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; this.maxBackupJournals = conf.getMaxBackupJournals(); - this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites()); + this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites(); + this.forceWriteThread = new ForceWriteThread(this); this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec()); this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold(); this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold(); - this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(), - new DaemonThreadFactory()); + this.journalAlignmentSize = conf.getJournalAlignmentSize(); + this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite(); + this.cbThreadPool = OrderedSafeExecutor.newBuilder() + .name("BookieJournal") + .numThreads(conf.getNumJournalCallbackThreads()) + .statsLogger(Stats.get().getStatsLogger("journal")) + .threadFactory(new DaemonThreadFactory()) + .build(); // Unless there is a cap on the max wait (which requires group force writes) // we cannot skip flushing for queue empty @@ -646,9 +675,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource { throws IOException { JournalChannel recLog; if (journalPos <= 0) { - recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize); + recLog = new JournalChannel(journalDirectory, journalId, + journalPreAllocSize, journalWriteBufferSize, statsLogger); } else { - recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos); + recLog = new JournalChannel(journalDirectory, journalId, + journalPreAllocSize, journalWriteBufferSize, journalPos, statsLogger); } int journalVersion = recLog.getFormatVersion(); try { @@ -701,6 +732,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource { if (!isPaddingRecord) { scanner.process(journalVersion, offset, recBuff); } + // update last log mark during replaying + lastLogMark.setCurLogMark(journalId, offset); } } finally { recLog.close(); @@ -785,10 +818,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource { public void run() { LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>(); ByteBuffer lenBuff = ByteBuffer.allocate(4); - ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize()); + ByteBuffer paddingBuff = ByteBuffer.allocate(2 * journalAlignmentSize); ZeroBuffer.put(paddingBuff); JournalChannel logFile = null; forceWriteThread.start(); + Stopwatch journalAllocationWatcher = new Stopwatch(); Stopwatch journalCreationWatcher = new Stopwatch(); Stopwatch journalFlushWatcher = new Stopwatch(); long batchSize = 0; @@ -799,7 +833,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29 long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1); BufferedChannel bc = null; - long lastFlushPosition = 0; + long lastFlushPosition = 0L; boolean groupWhenTimeout = false; long dequeueStartTime = 0L; @@ -815,15 +849,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource { logId, journalPreAllocSize, journalWriteBufferSize, - conf.getJournalAlignmentSize(), + journalAlignmentSize, removePagesFromCache, - conf.getJournalFormatVersionToWrite()); + journalFormatVersionToWrite, + statsLogger); journalCreationStats.registerSuccessfulEvent( journalCreationWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); bc = logFile.getBufferedChannel(); - lastFlushPosition = bc.position(); + lastFlushPosition = 0; } if (qe == null) { @@ -880,14 +915,21 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // toFlush is non null and not empty so should be safe to access getFirst if (shouldFlush) { - if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) { - writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize()); - } + long prevFlushPosition = lastFlushPosition; + journalFlushWatcher.reset().start(); + if (journalFormatVersionToWrite >= JournalChannel.V5) { + writePaddingBytes(logFile, paddingBuff, journalAlignmentSize); + } bc.flush(false); lastFlushPosition = bc.position(); - journalFlushStats.registerSuccessfulEvent( - journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + + // start sync the range + if (!enableGroupForceWrites) { + logFile.startSyncRange(prevFlushPosition, lastFlushPosition); + } + journalFlushLatencyStats.registerSuccessfulEvent( + journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS)); // Trace the lifetime of entries through persistence if (LOG.isDebugEnabled()) { @@ -899,7 +941,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource { forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size()); forceWriteBatchBytesStats.registerSuccessfulValue(batchSize); - forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false)); + forceWriteRequests.put(new ForceWriteRequest(logFile, logId, prevFlushPosition, + lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false)); toFlush = new LinkedList<QueueEntry>(); batchSize = 0L; // check whether journal file is over file limit @@ -936,8 +979,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // we should be doing the following, but then we run out of // direct byte buffers // logFile.write(new ByteBuffer[] { lenBuff, qe.entry }); - bc.write(lenBuff); - bc.write(qe.entry); + int flushes = 0; + flushes += bc.write(lenBuff); + flushes += bc.write(qe.entry); + + journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes); + journalMemAddLatencyStats.registerSuccessfulEvent( + MathUtils.elapsedMicroSec(qe.enqueueTime)); toFlush.add(qe); qe = null; @@ -970,11 +1018,10 @@ class Journal extends BookieCriticalThread implements CheckpointSource { LOG.info("Shutting down Journal"); forceWriteThread.shutdown(); cbThreadPool.shutdown(); - if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) { + ; + if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) { LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing"); } - cbThreadPool.shutdownNow(); - running = false; this.interrupt(); this.join(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index ad46e5c..e3077e1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -28,13 +28,23 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.ByteBuffer; import java.util.Arrays; - +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.NativeIO; import org.apache.bookkeeper.util.ZeroBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*; +import static org.apache.bookkeeper.util.NativeIO.*; /** * Simple wrapper around FileChannel to add versioning @@ -81,28 +91,36 @@ class JournalChannel implements Closeable { // The position of the file channel's last drop position private long lastDropPosition = 0L; + // Stats + private final OpStatsLogger journalPreallocationStats; + private final Counter journalForceWriteCounter; + private final OpStatsLogger journalForceWriteStats; + // Mostly used by tests JournalChannel(File journalDirectory, long logId) throws IOException { - this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE); + this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, NullStatsLogger.INSTANCE); } // Open journal for scanning starting from the first record in journal. - JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException { - this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE); + JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, StatsLogger statsLogger) + throws IOException { + this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, statsLogger); } // Open journal for scanning starting from given position. JournalChannel(File journalDirectory, long logId, - long preAllocSize, int writeBufferSize, long position) throws IOException { - this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5); + long preAllocSize, int writeBufferSize, long position, StatsLogger statsLogger) + throws IOException { + this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5, statsLogger); } // Open journal to write JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, int journalAlignSize, - boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException { + boolean fRemoveFromPageCache, int formatVersionToWrite, + StatsLogger statsLogger) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, - START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite); + START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, statsLogger); } /** @@ -124,12 +142,20 @@ class JournalChannel implements Closeable { * whether to remove cached pages from page cache. * @param formatVersionToWrite * format version to write + * @param statsLogger + * stats logger to record stats * @throws IOException */ - private JournalChannel(File journalDirectory, long logId, - long preAllocSize, int writeBufferSize, int journalAlignSize, - long position, boolean fRemoveFromPageCache, - int formatVersionToWrite) throws IOException { + private JournalChannel(File journalDirectory, + long logId, + long preAllocSize, + int writeBufferSize, + int journalAlignSize, + long position, + boolean fRemoveFromPageCache, + int formatVersionToWrite, + StatsLogger statsLogger) + throws IOException { this.journalAlignSize = journalAlignSize; this.zeros = ByteBuffer.allocate(journalAlignSize); this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize; @@ -149,9 +175,13 @@ class JournalChannel implements Closeable { + " suddenly appeared, is another bookie process running?"); } randomAccessFile = new RandomAccessFile(fn, "rw"); + fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD()); fc = randomAccessFile.getChannel(); formatVersion = formatVersionToWrite; + // preallocate the space the header + preallocate(); + int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; ByteBuffer bb = ByteBuffer.allocate(headerSize); ZeroBuffer.put(bb); @@ -162,11 +192,12 @@ class JournalChannel implements Closeable { fc.write(bb); bc = new BufferedChannel(fc, writeBufferSize); - forceWrite(true); - nextPrealloc = this.preAllocSize; - fc.write(zeros, nextPrealloc - journalAlignSize); + + // sync the file + // syncRangeOrForceWrite(0, HEADER_SIZE); } else { // open an existing file randomAccessFile = new RandomAccessFile(fn, "r"); + fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD()); fc = randomAccessFile.getChannel(); bc = null; // readonly @@ -215,7 +246,13 @@ class JournalChannel implements Closeable { LOG.error("Bookie journal file can seek to position :", e); } } - this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD()); + + // Stats + this.journalForceWriteCounter = statsLogger.getCounter(JOURNAL_NUM_FORCE_WRITES); + this.journalForceWriteStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_LATENCY); + this.journalPreallocationStats = statsLogger.getOpStatsLogger(JOURNAL_PREALLOCATION); + + LOG.info("Opened journal {} : fd {}", fn, fd); } int getFormatVersion() { @@ -229,14 +266,33 @@ class JournalChannel implements Closeable { return bc; } - void preAllocIfNeeded(long size) throws IOException { - if (bc.position() + size > nextPrealloc) { - nextPrealloc += preAllocSize; + private void preallocate() throws IOException { + long prevPrealloc = nextPrealloc; + nextPrealloc = prevPrealloc + preAllocSize; + if (!NativeIO.fallocateIfPossible(fd, prevPrealloc, preAllocSize)) { zeros.clear(); fc.write(zeros, nextPrealloc - journalAlignSize); } } + void preAllocIfNeeded(long size) throws IOException { + preAllocIfNeeded(size, null); + } + + void preAllocIfNeeded(long size, Stopwatch stopwatch) throws IOException { + if (bc.position() + size > nextPrealloc) { + if (null != stopwatch) { + stopwatch.reset().start(); + } + preallocate(); + if (null != stopwatch) { + journalPreallocationStats.registerSuccessfulEvent( + stopwatch.stop().elapsedTime(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); + } + } + } + int read(ByteBuffer dst) throws IOException { return fc.read(dst); @@ -246,11 +302,33 @@ class JournalChannel implements Closeable { fc.close(); } + public void startSyncRange(long offset, long bytes) throws IOException { + NativeIO.syncFileRangeIfPossible(fd, offset, bytes, SYNC_FILE_RANGE_WRITE); + } + + public boolean syncRangeIfPossible(long offset, long bytes) throws IOException { + if (NativeIO.syncFileRangeIfPossible(fd, offset, bytes, + SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER)) { + removeFromPageCacheIfPossible(offset + bytes); + return false; + } else { + return false; + } + } + public void forceWrite(boolean forceMetadata) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Journal ForceWrite"); } - long newForceWritePosition = bc.forceWrite(forceMetadata); + long startTimeNanos = MathUtils.nowInNano(); + forceWriteImpl(forceMetadata); + // collect stats + journalForceWriteCounter.inc(); + journalForceWriteStats.registerSuccessfulEvent( + MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } + + private void removeFromPageCacheIfPossible(long offset) { // // For POSIX_FADV_DONTNEED, we want to drop from the beginning // of the file to a position prior to the current position. @@ -265,11 +343,28 @@ class JournalChannel implements Closeable { // lastDropPosition newDropPos lastForceWritePosition // if (fRemoveFromPageCache) { - long newDropPos = newForceWritePosition - CACHE_DROP_LAG_BYTES; + long newDropPos = offset - CACHE_DROP_LAG_BYTES; if (lastDropPosition < newDropPos) { NativeIO.bestEffortRemoveFromPageCache(fd, lastDropPosition, newDropPos - lastDropPosition); } this.lastDropPosition = newDropPos; } } + + private void forceWriteImpl(boolean forceMetadata) throws IOException { + long newForceWritePosition = bc.forceWrite(forceMetadata); + removeFromPageCacheIfPossible(newForceWritePosition); + } + + public void syncRangeOrForceWrite(long offset, long bytes) throws IOException { + long startTimeNanos = MathUtils.nowInNano(); + if (!syncRangeIfPossible(offset, bytes)) { + forceWriteImpl(false); + } + // collect stats + journalForceWriteCounter.inc(); + journalForceWriteStats.registerSuccessfulEvent( + MathUtils.elapsedMicroSec(startTimeNanos), + TimeUnit.MICROSECONDS); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java new file mode 100644 index 0000000..e5d8ae8 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java @@ -0,0 +1,115 @@ +package org.apache.bookkeeper.util; + +import com.sun.jna.Library; +import com.sun.jna.Native; + +public class Errno { + + private static InterfaceDelegate delegate = (InterfaceDelegate) Native.loadLibrary("c", + InterfaceDelegate.class); + + /** + * The routine perror() produces a message on the standard error output, + * describing the last error encountered during a call to a system or + * library function. First (if s is not NULL and *s is not a null byte + * ('\0')) the argument string s is printed, followed by a colon and a + * blank. Then the message and a new-line. + * + * To be of most use, the argument string should include the name of the + * function that incurred the error. The error number is taken from the + * external variable errno, which is set when errors occur but not cleared + * when non-erroneous calls are made. + * + * The global error list sys_errlist[] indexed by errno can be used to + * obtain the error message without the newline. The largest message number + * provided in the table is sys_nerr -1. Be careful when directly accessing + * this list because new error values may not have been added to + * sys_errlist[]. + * + * When a system call fails, it usually returns -1 and sets the variable + * errno to a value describing what went wrong. (These values can be found + * in <errno.h>.) Many library functions do likewise. The function perror() + * serves to translate this error code into human-readable form. Note that + * errno is undefined after a successful library call: this call may well + * change this variable, even though it succeeds, for example because it + * internally used some other library function that failed. Thus, if a + * failing call is not immediately followed by a call to perror(), the value + * of errno should be saved. + */ + public static int perror(String s) { + return delegate.perror(s); + } + + /** + * The strerror() function returns a string describing the error code passed + * in the argument errnum, possibly using the LC_MESSAGES part of the + * current locale to select the appropriate language. This string must not + * be modified by the application, but may be modified by a subsequent call + * to perror() or strerror(). No library function will modify this string. + * + * The strerror_r() function is similar to strerror(), but is thread safe. + * This function is available in two versions: an XSI-compliant version + * specified in POSIX.1-2001, and a GNU-specific version (available since + * glibc 2.0). If _XOPEN_SOURCE is defined with the value 600, then the + * XSI-compliant version is provided, otherwise the GNU-specific version is + * provided. + * + * The XSI-compliant strerror_r() is preferred for portable applications. It + * returns the error string in the user-supplied buffer buf of length + * buflen. + * + * The GNU-specific strerror_r() returns a pointer to a string containing + * the error message. This may be either a pointer to a string that the + * function stores in buf, or a pointer to some (immutable) static string + * (in which case buf is unused). If the function stores a string in buf, + * then at most buflen bytes are stored (the string may be truncated if + * buflen is too small) and the string always includes a terminating null + * byte. + * + */ + public static String strerror(int errnum) { + return delegate.strerror(errnum); + } + + public static String strerror() { + return strerror(errno()); + } + + /** + * The <errno.h> header file defines the integer variable errno, which is + * set by system calls and some library functions in the event of an error + * to indicate what went wrong. Its value is significant only when the call + * returned an error (usually -1), and a function that does succeed is + * allowed to change errno. + * + * Sometimes, when -1 is also a valid successful return value one has to + * zero errno before the call in order to detect possible errors. + * + * errno is defined by the ISO C standard to be a modifiable lvalue of type + * int, and must not be explicitly declared; errno may be a macro. errno is + * thread-local; setting it in one thread does not affect its value in any + * other thread. + * + * Valid error numbers are all non-zero; errno is never set to zero by any + * library function. All the error names specified by POSIX.1 must have + * distinct values, with the exception of EAGAIN and EWOULDBLOCK, which may + * be the same. + * + * Below is a list of the symbolic error names that are defined on Linux. + * Some of these are marked POSIX.1, indicating that the name is defined by + * POSIX.1-2001, or C99, indicating that the name is defined by C99. + * + */ + public static int errno() { + return Native.getLastError(); + } + + interface InterfaceDelegate extends Library { + + int perror(String s); + + String strerror(int errnum); + + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java index 2448842..9eb3a68 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java @@ -20,34 +20,56 @@ package org.apache.bookkeeper.util; import java.lang.reflect.Field; import java.io.FileDescriptor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sun.jna.LastErrorException; -import com.sun.jna.Native; public final class NativeIO { private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class); private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */ + /** + * Wait upon writeout of all pages in the range before performing the write. + */ + public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1; + /** + * Initiate writeout of all those dirty pages in the range which are not presently + * under writeback. + */ + public static final int SYNC_FILE_RANGE_WRITE = 2; + /** + * Wait upon writeout of all pages in the range after performing the write. + */ + public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4; + + private static final int FALLOC_FL_KEEP_SIZE = 1; + private static boolean initialized = false; private static boolean fadvisePossible = true; + private static boolean syncFileRangePossible = true; + private static boolean sysFallocatePossible = true; + private static boolean posixFallocatePossible = true; static { try { - Native.register("c"); + LOG.info("Loading bookkeeper native library."); + System.loadLibrary("bookkeeper"); initialized = true; - } catch (NoClassDefFoundError e) { - LOG.info("JNA not found. Native methods will be disabled."); - } catch (UnsatisfiedLinkError e) { - LOG.info("Unable to link C library. Native methods will be disabled."); - } catch (NoSuchMethodError e) { - LOG.warn("Obsolete version of JNA present; unable to register C library"); + LOG.info("Loaded bookkeeper native library. Enabled Native IO."); + } catch (Throwable t) { + LOG.info("Unable to load bookkeeper native library. Native methods will be disabled : ", t); } } // fadvice - public static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException; + public static native int posix_fadvise(int fd, long offset, long len, int flag); + // posix_fallocate + public static native int posix_fallocate(int fd, long offset, long len); + // fallocate + public static native int fallocate(int fd, int mode, long offset, long len); + // sync_file_range(2) + public static native int sync_file_range(int fd, long offset, long len, int flags); private NativeIO() {} @@ -66,6 +88,7 @@ public final class NativeIO { return field; } + /** * Get system file descriptor (int) from FileDescriptor object. * @param descriptor - FileDescriptor object to get fd from @@ -82,6 +105,92 @@ public final class NativeIO { return -1; } + public static boolean fallocateIfPossible(int fd, long offset, long nbytes) { + if (!initialized || fd < 0) { + return false; + } + boolean allocated = false; + if (sysFallocatePossible) { + allocated = sysFallocateIfPossible(fd, offset, nbytes); + } + if (!allocated && posixFallocatePossible) { + allocated = posixFallocateIfPossible(fd, offset, nbytes); + } + return allocated; + } + + private static boolean sysFallocateIfPossible(int fd, long offset, long nbytes) { + try { + int rc = fallocate(fd, FALLOC_FL_KEEP_SIZE, offset, nbytes); + if (rc != 0) { + LOG.error("Failed on sys fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}", + new Object[] { fd, offset, nbytes, rc, Errno.strerror() }); + return false; + } + } catch (UnsupportedOperationException uoe) { + LOG.warn("sys fallocate isn't supported : ", uoe); + sysFallocatePossible = false; + } catch (UnsatisfiedLinkError nle) { + LOG.warn("Unsatisfied Link error: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, nle }); + sysFallocatePossible = false; + } catch (Exception e) { + LOG.error("Unknown exception: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, e }); + return false; + } + return sysFallocatePossible; + } + + private static boolean posixFallocateIfPossible(int fd, long offset, long nbytes) { + try { + int rc = posix_fallocate(fd, offset, nbytes); + if (rc != 0) { + LOG.error("Failed on posix_fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}", + new Object[] { fd, offset, nbytes, rc, Errno.strerror() }); + return false; + } + } catch (UnsupportedOperationException uoe) { + LOG.warn("posix_fallocate isn't supported : ", uoe); + posixFallocatePossible = false; + } catch (UnsatisfiedLinkError nle) { + LOG.warn("Unsatisfied Link error: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, nle }); + posixFallocatePossible = false; + } catch (Exception e) { + LOG.error("Unknown exception: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, e }); + return false; + } + return posixFallocatePossible; + } + + public static boolean syncFileRangeIfPossible(int fd, long offset, long nbytes, int flags) { + if (!initialized || !syncFileRangePossible || fd < 0) { + return false; + } + try { + int rc = sync_file_range(fd, offset, nbytes, flags); + if (rc != 0) { + LOG.error("Failed on syncing file descriptor {}, offset {}, bytes {}, rc {} : {}", + new Object[] { fd, offset, nbytes, rc, Errno.strerror() }); + return false; + } + } catch (UnsupportedOperationException uoe) { + LOG.warn("sync_file_range isn't supported : ", uoe); + syncFileRangePossible = false; + } catch (UnsatisfiedLinkError nle) { + LOG.warn("Unsatisfied Link error: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, nle }); + syncFileRangePossible = false; + } catch (Exception e) { + LOG.error("Unknown exception: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ", + new Object[] { fd, offset, nbytes, e }); + return false; + } + return syncFileRangePossible; + } + /** * Remove pages from the file system page cache when they wont * be accessed again @@ -89,16 +198,22 @@ public final class NativeIO { * @param fd The file descriptor of the source file. * @param offset The offset within the file. * @param len The length to be flushed. - * - * @throws nothing => Best effort */ - public static void bestEffortRemoveFromPageCache(int fd, long offset, long len) { + posixFadviseIfPossible(fd, offset, len, POSIX_FADV_DONTNEED); + } + + public static boolean posixFadviseIfPossible(int fd, long offset, long len, int flags) { if (!initialized || !fadvisePossible || fd < 0) { - return; + return false; } try { - posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED); + int rc = posix_fadvise(fd, offset, len, flags); + if (rc != 0) { + LOG.error("Failed on posix_fadvise file descriptor {}, offset {}, bytes {}, flags {}, rc {} : {}", + new Object[] { fd, offset, len, flags, rc, Errno.strerror() }); + return false; + } } catch (UnsupportedOperationException uoe) { LOG.warn("posix_fadvise is not supported : ", uoe); fadvisePossible = false; @@ -113,7 +228,9 @@ public final class NativeIO { // exception and forget LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {} : ", new Object[] { fd, offset, e }); + return false; } + return fadvisePossible; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c new file mode 100644 index 0000000..b93bde4 --- /dev/null +++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <jni.h> + +#include <errno.h> +#include <fcntl.h> +#include <sys/syscall.h> +#include <sys/types.h> +#include <asm-x86_64/unistd.h> +#include "config.h" + +#if defined(HAVE_SYNC_FILE_RANGE) +# define my_sync_file_range sync_file_range +#elif defined(__NR_sync_file_range) +// RHEL 5 kernels have sync_file_range support, but the glibc +// included does not have the library function. We can +// still call it directly, and if it's not supported by the +// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581 +static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags) +{ +#ifdef __x86_64__ + return syscall( __NR_sync_file_range, fd, from, to, flags); +#else + return syscall (__NR_sync_file_range, fd, + __LONG_LONG_PAIR ((long) (from >> 32), (long) from), + __LONG_LONG_PAIR ((long) (to >> 32), (long) to), + flags); +#endif +} +#define my_sync_file_range manual_sync_file_range +#endif + +/** + * public static native void sync_file_range( + * int fd, long offset, long len, int flags); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jint JNICALL +Java_org_apache_bookkeeper_util_NativeIO_sync_1file_1range( + JNIEnv *env, jclass clazz, + jint fd, jlong offset, jlong len, jint flags) +{ +#ifndef my_sync_file_range + errno = ENOSYS; + return -1; +#else + return my_sync_file_range(fd, (off_t)offset, (off_t)len, flags); +#endif +} + +#if defined(HAVE_FALLOCATE) +# define my_fallocate fallocate +#elif defined(__NR_fallocate) +static int manual_fallocate (int fd, int mode, __off64_t from, __off64_t to) +{ +#ifdef __x86_64__ + return syscall( __NR_fallocate, fd, mode, from, to); +#else + return syscall (__NR_fallocate, fd, mode, + __LONG_LONG_PAIR ((long) (from >> 32), (long) from), + __LONG_LONG_PAIR ((long) (to >> 32), (long) to)); +#endif +} +#define my_fallocate manual_fallocate +#endif + +JNIEXPORT jint JNICALL +Java_org_apache_bookkeeper_util_NativeIO_fallocate( + JNIEnv *env, jclass clazz, + jint fd, jint mode, jlong offset, jlong len) +{ +#ifndef my_fallocate + errno = ENOSYS; + return -1; +#else + return my_fallocate(fd, mode, (off_t)offset, (off_t)len); +#endif +} + +JNIEXPORT jint JNICALL +Java_org_apache_bookkeeper_util_NativeIO_posix_1fadvise( + JNIEnv *env, jclass clazz, + jint fd, jlong offset, jlong len, jint flags) +{ +#ifndef HAVE_POSIX_FADVISE + errno = ENOSYS; + return -1; +#else + return posix_fadvise(fd, (off_t)offset, (off_t)len, flags); +#endif +} + +JNIEXPORT jint JNICALL +Java_org_apache_bookkeeper_util_NativeIO_posix_1fallocate( + JNIEnv *env, jclass clazz, + jint fd, jlong offset, jlong len) +{ +#ifndef HAVE_POSIX_FALLOCATE + errno = ENOSYS; + return -1; +#else + return posix_fallocate(fd, (off_t)offset, (off_t)len); +#endif +}
