viirya opened a new pull request #3350:
URL: https://github.com/apache/hadoop/pull/3350
In the newly added BuiltInGzipCompressor, we should not let header and
trailer as static variables as they are modifiable from different instances at
the same time.
I prepared a test. But as this is like race-condition, so it is not
definitely causing the issue. Just post the test here for reference.
```java
@Test
public void testGzipCompressorsInSameJVM() throws InterruptedException {
// This test makes sure multiple BuiltInGzipCompressor can work in same
JVM.
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration conf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class,
conf);
class CompressionThread extends Thread {
Compressor compressor;
byte[] b;
int inputSize;
boolean failed = false;
Exception exception;
CompressionThread(CompressionCodec codec, int inputSize, byte[] b) {
compressor = codec.createCompressor();
assertEquals("Compressor is not a BuiltInGzipCompressor",
BuiltInGzipCompressor.class, compressor.getClass());
this.b = b;
this.inputSize = inputSize;
}
public void run() {
compressor.setInput(b,0, b.length);
compressor.finish();
byte[] output = new byte[inputSize + 1024];
int outputOff = 0;
try {
while (!compressor.finished()) {
byte[] buf = new byte[2];
int compressed = compressor.compress(buf, 0, buf.length);
System.arraycopy(buf, 0, output, outputOff, compressed);
outputOff += compressed;
}
} catch (IOException e) {
failed = true;
exception = e;
}
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(output, outputOff);
try {
Decompressor decom = codec.createDecompressor();
assertNotNull("Got a null decompressor", decom);
assertEquals("Decompressor is not a BuiltInGzipDecompressor",
BuiltInGzipDecompressor.class, decom.getClass());
InputStream gzin = codec.createInputStream(gzbuf, decom);
DataOutputBuffer dflbuf = new DataOutputBuffer();
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
byte[] dflchk = Arrays.copyOf(dflbuf.getData(),
dflbuf.getLength());
assertArrayEquals("decompressed data must be the same as original
input", b, dflchk);
} catch (IOException e) {
failed = true;
exception = e;
}
}
}
Random r = new Random();
while (true) {
long seed = r.nextLong();
r.setSeed(seed);
int inputSize = r.nextInt(128 * 1024) + 1;
byte[] b1 = new byte[inputSize];
r.nextBytes(b1);
CompressionThread p1 = new CompressionThread(codec, inputSize, b1);
// Tweak the input bytes a bit. So their crc values will be different.
byte[] b2 = new byte[inputSize];
System.arraycopy(b1, 0, b2, 0, inputSize);
b2[0] = (byte) r.nextInt();
CompressionThread p2 = new CompressionThread(codec, inputSize, b2);
p1.start();
p2.start();
p1.join();
p2.join();
if (p1.failed) {
fail("Failure happens during two compression threads running: " +
p1.exception.getMessage());
}
if (p2.failed) {
fail("Failure happens during two compression threads running: " +
p2.exception.getMessage());
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]