Author: cdouglas
Date: Tue Jul 1 18:15:30 2008
New Revision: 673254
URL: http://svn.apache.org/viewvc?rev=673254&view=rev
Log:
HADOOP-3604. Work around a JVM synchronization problem observed while
retrieving the address of direct buffers from compression code by obtaining
a lock during this call. Contributed by Arun C Murthy.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
hadoop/core/trunk/src/native/src/org_apache_hadoop.h
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jul 1 18:15:30 2008
@@ -728,6 +728,10 @@
HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
(Lohit Vijayarenu via shv)
+ HADOOP-3604. Work around a JVM synchronization problem observed while
+ retrieving the address of direct buffers from compression code by obtaining
+ a lock during this call. (Arun C Murthy via cdouglas)
+
Release 0.17.1 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
Tue Jul 1 18:15:30 2008
@@ -36,6 +36,9 @@
private static final Log LOG =
LogFactory.getLog(LzoCompressor.class.getName());
+ // HACK - Use this as a global lock in the JNI layer
+ private static Class clazz = LzoDecompressor.class;
+
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
Tue Jul 1 18:15:30 2008
@@ -35,6 +35,9 @@
public class LzoDecompressor implements Decompressor {
private static final Log LOG =
LogFactory.getLog(LzoDecompressor.class.getName());
+
+ // HACK - Use this as a global lock in the JNI layer
+ private static Class clazz = LzoDecompressor.class;
private int directBufferSize;
private Buffer compressedDirectBuf = null;
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Tue Jul 1 18:15:30 2008
@@ -33,7 +33,10 @@
*/
public class ZlibCompressor implements Compressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
-
+
+ // HACK - Use this as a global lock in the JNI layer
+ private static Class clazz = ZlibCompressor.class;
+
private long stream;
private CompressionLevel level;
private CompressionStrategy strategy;
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
Tue Jul 1 18:15:30 2008
@@ -34,6 +34,9 @@
public class ZlibDecompressor implements Decompressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
+ // HACK - Use this as a global lock in the JNI layer
+ private static Class clazz = ZlibDecompressor.class;
+
private long stream;
private CompressionHeader header;
private int directBufferSize;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue
Jul 1 18:15:30 2008
@@ -53,6 +53,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
@@ -1048,145 +1049,50 @@
* local fs) from the remote server.
* We use the file system so that we generate checksum files on the data.
* @param mapOutputLoc map-output to be fetched
- * @param localFilename the filename to write the data into
+ * @param filename the filename to write the data into
* @param connectionTimeout number of milliseconds for connection timeout
* @param readTimeout number of milliseconds for read timeout
* @return the path of the file that got created
* @throws IOException when something goes wrong
*/
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
- Path localFilename)
+ Path filename)
throws IOException, InterruptedException {
- boolean good = false;
- OutputStream output = null;
+ // Connect
+ URLConnection connection =
+ mapOutputLoc.getOutputLocation().openConnection();
+ InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
+ STALLED_COPY_TIMEOUT);
+
+ //We will put a file in memory if it meets certain criteria:
+ //1. The size of the (decompressed) file should be less than 25% of
+ // the total inmem fs
+ //2. There is space available in the inmem fs
+
+ long decompressedLength =
+ Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
+ long compressedLength =
+ Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+
+ // Check if this map-output can be saved in-memory
+ boolean shuffleInMemory =
ramManager.canFitInMemory(decompressedLength);
+
+ // Shuffle
MapOutput mapOutput = null;
-
- try {
- URLConnection connection =
- mapOutputLoc.getOutputLocation().openConnection();
- InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
- STALLED_COPY_TIMEOUT);
- //We will put a file in memory if it meets certain criteria:
- //1. The size of the (decompressed) file should be less than 25% of
- // the total inmem fs
- //2. There is space available in the inmem fs
-
- long decompressedLength =
- Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
- long compressedLength =
- Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-
- // Check if this map-output can be saved in-memory
- boolean canFitInMemory =
- ramManager.canFitInMemory(decompressedLength);
-
- if (canFitInMemory) {
- int requestedSize = (int)decompressedLength;
- // Check if we have enough buffer-space to keep map-output
in-memory
- boolean createdNow =
- ramManager.reserve(requestedSize, input);
-
- LOG.info("Shuffling " + requestedSize + " bytes (" +
- compressedLength + " raw bytes) " +
- "into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
-
- if (!createdNow) {
- // Reconnect
- try {
- connection = mapOutputLoc.getOutputLocation().openConnection();
- input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
- STALLED_COPY_TIMEOUT);
- } catch (Throwable t) {
- // Cleanup
- ramManager.closeInMemoryFile(requestedSize);
- ramManager.unreserve(requestedSize);
-
- IOException ioe = new IOException("Failed to re-open " +
- "connection to "
+
-
mapOutputLoc.getHost());
- ioe.initCause(t);
- throw ioe;
- }
- }
-
- // Are map-outputs compressed?
- if (codec != null) {
- decompressor.reset();
- input = codec.createInputStream(input, decompressor);
- }
-
- output = new DataOutputBuffer((int)decompressedLength);
- }
- else {
- // Find out a suitable location for the output on local-filesystem
- localFilename = lDirAlloc.getLocalPathForWrite(
- localFilename.toUri().getPath(), decompressedLength, conf);
- LOG.info("Shuffling " + decompressedLength + " bytes (" +
- compressedLength + " raw bytes) " +
- "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
- output = localFileSys.create(localFilename);
- }
-
- long bytesRead = 0;
- try {
- try {
- byte[] buf = new byte[64 * 1024];
+ if (shuffleInMemory) {
+ LOG.info("Shuffling " + decompressedLength + " bytes (" +
+ compressedLength + " raw bytes) " +
+ "into RAM from " + mapOutputLoc.getTaskAttemptId());
- int n = input.read(buf, 0, buf.length);
- while (n > 0) {
- bytesRead += n;
- shuffleClientMetrics.inputBytes(n);
- output.write(buf, 0, n);
-
- // indicate we're making progress
- reporter.progress();
- n = input.read(buf, 0, buf.length);
- }
-
- LOG.info("Read " + bytesRead + " bytes from map-output " +
- "for " + mapOutputLoc.getTaskAttemptId());
-
- if (canFitInMemory) {
- byte[] shuffleData = ((DataOutputBuffer)output).getData();
- mapOutput = new MapOutput(mapOutputLoc.getTaskId(),
-
((DataOutputBuffer)output).getData());
- ramManager.closeInMemoryFile(shuffleData.length);
- } else {
- mapOutput =
- new MapOutput(mapOutputLoc.getTaskId(), conf,
- localFileSys.makeQualified(localFilename),
- compressedLength);
- }
- } finally {
- output.close();
- }
- } finally {
- input.close();
- }
-
- // Sanity check
- good = (canFitInMemory) ? (bytesRead == decompressedLength) :
- (bytesRead == compressedLength);
- if (!good) {
- throw new IOException("Incomplete map output received for " +
- mapOutputLoc.getTaskAttemptId() + " from " +
- mapOutputLoc.getOutputLocation() + " (" +
- bytesRead + " instead of " +
- decompressedLength + ")"
- );
- }
- } finally {
- if (!good) {
- try {
- if (mapOutput != null) {
- mapOutput.discard();
- }
- } catch (Throwable th) {
- // IGNORED because we are cleaning up
- }
- }
+ mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength);
+ } else {
+ LOG.info("Shuffling " + decompressedLength + " bytes (" +
+ compressedLength + " raw bytes) " +
+ "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
+
+ mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}
-
+
return mapOutput;
}
@@ -1235,7 +1141,197 @@
}
}
- }
+ private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
+ URLConnection connection,
+ InputStream input,
+ int mapOutputLength)
+ throws IOException, InterruptedException {
+ // Reserve ram for the map-output
+ boolean createdNow = ramManager.reserve(mapOutputLength, input);
+
+ // Reconnect if we need to
+ if (!createdNow) {
+ // Reconnect
+ try {
+ connection = mapOutputLoc.getOutputLocation().openConnection();
+ input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
+ STALLED_COPY_TIMEOUT);
+ } catch (IOException ioe) {
+ LOG.info("Failed reopen connection to fetch map-output from " +
+ mapOutputLoc.getHost());
+
+ // Inform the ram-manager
+ ramManager.closeInMemoryFile(mapOutputLength);
+ ramManager.unreserve(mapOutputLength);
+
+ throw ioe;
+ }
+ }
+
+ // Are map-outputs compressed?
+ if (codec != null) {
+ decompressor.reset();
+ input = codec.createInputStream(input, decompressor);
+ }
+
+ // Copy map-output into an in-memory buffer
+ byte[] shuffleData = new byte[mapOutputLength];
+ MapOutput mapOutput =
+ new MapOutput(mapOutputLoc.getTaskId(), shuffleData);
+
+ int bytesRead = 0;
+ try {
+ int n = input.read(shuffleData, 0, shuffleData.length);
+ while (n > 0) {
+ bytesRead += n;
+ shuffleClientMetrics.inputBytes(n);
+
+ // indicate we're making progress
+ reporter.progress();
+ n = input.read(shuffleData, bytesRead,
+ (shuffleData.length-bytesRead));
+ }
+
+ LOG.info("Read " + bytesRead + " bytes from map-output for " +
+ mapOutputLoc.getTaskAttemptId());
+
+ input.close();
+ } catch (IOException ioe) {
+ LOG.info("Failed to shuffle from " +
mapOutputLoc.getTaskAttemptId(),
+ ioe);
+
+ // Inform the ram-manager
+ ramManager.closeInMemoryFile(mapOutputLength);
+ ramManager.unreserve(mapOutputLength);
+
+ // Discard the map-output
+ try {
+ mapOutput.discard();
+ } catch (IOException ignored) {
+ LOG.info("Failed to discard map-output from " +
+ mapOutputLoc.getTaskAttemptId(), ignored);
+ }
+ mapOutput = null;
+
+ // Close the streams
+ IOUtils.cleanup(LOG, input);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ // Close the in-memory file
+ ramManager.closeInMemoryFile(mapOutputLength);
+
+ // Sanity check
+ if (bytesRead != mapOutputLength) {
+ // Inform the ram-manager
+ ramManager.unreserve(mapOutputLength);
+
+ // Discard the map-output
+ try {
+ mapOutput.discard();
+ } catch (IOException ignored) {
+ // IGNORED because we are cleaning up
+ LOG.info("Failed to discard map-output from " +
+ mapOutputLoc.getTaskAttemptId(), ignored);
+ }
+ mapOutput = null;
+
+ throw new IOException("Incomplete map output received for " +
+ mapOutputLoc.getTaskAttemptId() + " from " +
+ mapOutputLoc.getOutputLocation() + " (" +
+ bytesRead + " instead of " +
+ mapOutputLength + ")"
+ );
+ }
+
+ return mapOutput;
+ }
+
+ private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
+ InputStream input,
+ Path filename,
+ long mapOutputLength)
+ throws IOException {
+ // Find out a suitable location for the output on local-filesystem
+ Path localFilename =
+ lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
+ mapOutputLength, conf);
+
+ MapOutput mapOutput =
+ new MapOutput(mapOutputLoc.getTaskId(), conf,
+ localFileSys.makeQualified(localFilename),
+ mapOutputLength);
+
+
+ // Copy data to local-disk
+ OutputStream output = null;
+ long bytesRead = 0;
+ try {
+ output = localFileSys.create(localFilename);
+
+ byte[] buf = new byte[64 * 1024];
+ int n = input.read(buf, 0, buf.length);
+ while (n > 0) {
+ bytesRead += n;
+ shuffleClientMetrics.inputBytes(n);
+ output.write(buf, 0, n);
+
+ // indicate we're making progress
+ reporter.progress();
+ n = input.read(buf, 0, buf.length);
+ }
+
+ LOG.info("Read " + bytesRead + " bytes from map-output for " +
+ mapOutputLoc.getTaskAttemptId());
+
+ output.close();
+ input.close();
+ } catch (IOException ioe) {
+ LOG.info("Failed to shuffle from " +
mapOutputLoc.getTaskAttemptId(),
+ ioe);
+
+ // Discard the map-output
+ try {
+ mapOutput.discard();
+ } catch (IOException ignored) {
+ LOG.info("Failed to discard map-output from " +
+ mapOutputLoc.getTaskAttemptId(), ignored);
+ }
+ mapOutput = null;
+
+ // Close the streams
+ IOUtils.cleanup(LOG, input, output);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ // Sanity check
+ if (bytesRead != mapOutputLength) {
+ try {
+ mapOutput.discard();
+ } catch (Throwable th) {
+ // IGNORED because we are cleaning up
+ LOG.info("Failed to discard map-output from " +
+ mapOutputLoc.getTaskAttemptId(), th);
+ }
+ mapOutput = null;
+
+ throw new IOException("Incomplete map output received for " +
+ mapOutputLoc.getTaskAttemptId() + " from " +
+ mapOutputLoc.getOutputLocation() + " (" +
+ bytesRead + " instead of " +
+ mapOutputLength + ")"
+ );
+ }
+
+ return mapOutput;
+
+ }
+
+ } // MapOutputCopier
private void configureClasspath(JobConf conf)
throws IOException {
Modified:
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c
(original)
+++
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c
Tue Jul 1 18:15:30 2008
@@ -118,6 +118,7 @@
lzo_bytep dst, lzo_uintp dst_len,
lzo_voidp wrkmem, int compression_level );
+static jfieldID LzoCompressor_clazz;
static jfieldID LzoCompressor_finish;
static jfieldID LzoCompressor_finished;
static jfieldID LzoCompressor_uncompressedDirectBuf;
@@ -139,6 +140,8 @@
return;
}
+ LzoCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
LzoCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
LzoCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
LzoCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
@@ -215,6 +218,8 @@
const char *lzo_compressor_function = lzo_compressors[compressor].function;
// Get members of LzoCompressor
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ LzoCompressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this,
LzoCompressor_uncompressedDirectBuf);
lzo_uint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
@@ -231,20 +236,31 @@
jlong lzo_compressor_funcptr = (*env)->GetLongField(env, this,
LzoCompressor_lzoCompressor);
- // Get direct buffers
+ // Get the input direct buffer
+ LOCK_CLASS(env, clazz, "LzoCompressor");
lzo_bytep uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "LzoCompressor");
+
if (uncompressed_bytes == 0) {
return (jint)0;
}
+ // Get the output direct buffer
+ LOCK_CLASS(env, clazz, "LzoCompressor");
lzo_bytep compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "LzoCompressor");
+
if (compressed_bytes == 0) {
return (jint)0;
}
- lzo_voidp workmem = (*env)->GetDirectBufferAddress(env, working_memory_buf);
+ // Get the working-memory direct buffer
+ LOCK_CLASS(env, clazz, "LzoCompressor");
+ lzo_voidp workmem = (*env)->GetDirectBufferAddress(env,
working_memory_buf);
+ UNLOCK_CLASS(env, clazz, "LzoCompressor");
+
if (workmem == 0) {
return (jint)0;
}
Modified:
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c
(original)
+++
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c
Tue Jul 1 18:15:30 2008
@@ -88,6 +88,7 @@
/* 27 */ "lzo2a_decompress_safe"
};
+static jfieldID LzoDecompressor_clazz;
static jfieldID LzoDecompressor_finished;
static jfieldID LzoDecompressor_compressedDirectBuf;
static jfieldID LzoDecompressor_compressedDirectBufLen;
@@ -106,6 +107,8 @@
return;
}
+ LzoDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
LzoDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
LzoDecompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
"compressedDirectBuf",
@@ -173,6 +176,8 @@
const char *lzo_decompressor_function = lzo_decompressors[decompressor];
// Get members of LzoDecompressor
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ LzoDecompressor_clazz);
jobject compressed_direct_buf = (*env)->GetObjectField(env, this,
LzoDecompressor_compressedDirectBuf);
lzo_uint compressed_direct_buf_len = (*env)->GetIntField(env, this,
@@ -186,15 +191,22 @@
jlong lzo_decompressor_funcptr = (*env)->GetLongField(env, this,
LzoDecompressor_lzoDecompressor);
- // Get direct buffers
+ // Get the input direct buffer
+ LOCK_CLASS(env, clazz, "LzoDecompressor");
lzo_bytep uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "LzoDecompressor");
+
if (uncompressed_bytes == 0) {
return (jint)0;
}
+ // Get the output direct buffer
+ LOCK_CLASS(env, clazz, "LzoDecompressor");
lzo_bytep compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "LzoDecompressor");
+
if (compressed_bytes == 0) {
return (jint)0;
}
Modified:
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
(original)
+++
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
Tue Jul 1 18:15:30 2008
@@ -47,6 +47,7 @@
#include "org_apache_hadoop_io_compress_zlib.h"
#include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h"
+static jfieldID ZlibCompressor_clazz;
static jfieldID ZlibCompressor_stream;
static jfieldID ZlibCompressor_uncompressedDirectBuf;
static jfieldID ZlibCompressor_uncompressedDirectBufOff;
@@ -82,6 +83,8 @@
LOAD_DYNAMIC_SYMBOL(dlsym_deflateEnd, env, libz, "deflateEnd");
// Initialize the requisite fieldIds
+ ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@@ -186,6 +189,9 @@
return (jint)0;
}
+ // Get members of ZlibCompressor
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ ZlibCompressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this,
ZlibCompressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
@@ -200,14 +206,22 @@
jboolean finish = (*env)->GetBooleanField(env, this,
ZlibCompressor_finish);
+ // Get the input direct buffer
+ LOCK_CLASS(env, clazz, "ZlibCompressor");
Bytef* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "ZlibCompressor");
+
if (uncompressed_bytes == 0) {
return (jint)0;
}
+ // Get the output direct buffer
+ LOCK_CLASS(env, clazz, "ZlibCompressor");
Bytef* compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "ZlibCompressor");
+
if (compressed_bytes == 0) {
return (jint)0;
}
Modified:
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
---
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
(original)
+++
hadoop/core/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
Tue Jul 1 18:15:30 2008
@@ -47,6 +47,7 @@
#include "org_apache_hadoop_io_compress_zlib.h"
#include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h"
+static jfieldID ZlibDecompressor_clazz;
static jfieldID ZlibDecompressor_stream;
static jfieldID ZlibDecompressor_compressedDirectBuf;
static jfieldID ZlibDecompressor_compressedDirectBufOff;
@@ -82,6 +83,8 @@
LOAD_DYNAMIC_SYMBOL(dlsym_inflateEnd, env, libz, "inflateEnd");
// Initialize the requisite fieldIds
+ ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict",
"Z");
ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished",
"Z");
@@ -181,6 +184,9 @@
return (jint)0;
}
+ // Get members of ZlibDecompressor
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ ZlibDecompressor_clazz);
jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env,
this,
ZlibDecompressor_compressedDirectBuf);
jint compressed_direct_buf_off = (*env)->GetIntField(env, this,
@@ -193,14 +199,22 @@
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
ZlibDecompressor_directBufferSize);
+ // Get the input direct buffer
+ LOCK_CLASS(env, clazz, "ZlibDecompressor");
Bytef *compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
+
if (!compressed_bytes) {
return (jint)0;
}
+ // Get the output direct buffer
+ LOCK_CLASS(env, clazz, "ZlibDecompressor");
Bytef *uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
+
if (!uncompressed_bytes) {
return (jint)0;
}
Modified: hadoop/core/trunk/src/native/src/org_apache_hadoop.h
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/native/src/org_apache_hadoop.h?rev=673254&r1=673253&r2=673254&view=diff
==============================================================================
--- hadoop/core/trunk/src/native/src/org_apache_hadoop.h (original)
+++ hadoop/core/trunk/src/native/src/org_apache_hadoop.h Tue Jul 1 18:15:30
2008
@@ -79,6 +79,19 @@
return; \
}
+#define LOCK_CLASS(env, clazz, classname) \
+ if ((*env)->MonitorEnter(env, clazz) != 0) { \
+ char exception_msg[128]; \
+ snprintf(exception_msg, 128, "Failed to lock %s", classname); \
+ THROW(env, "java/lang/InternalError", exception_msg); \
+ }
+
+#define UNLOCK_CLASS(env, clazz, classname) \
+ if ((*env)->MonitorExit(env, clazz) != 0) { \
+ char exception_msg[128]; \
+ snprintf(exception_msg, 128, "Failed to unlock %s", classname); \
+ THROW(env, "java/lang/InternalError", exception_msg); \
+ }
#endif