[
https://issues.apache.org/jira/browse/HDFS-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903502#comment-16903502
]
xuzq commented on HDFS-14099:
-----------------------------
[~aajisaka] [~jlowe] [~churromorales] have a look? thanks
> Unknown frame descriptor when decompressing multiple frames in
> ZStandardDecompressor
> ------------------------------------------------------------------------------------
>
> Key: HDFS-14099
> URL: https://issues.apache.org/jira/browse/HDFS-14099
> Project: Hadoop HDFS
> Issue Type: Bug
> Environment: Hadoop Version: hadoop-3.0.3
> Java Version: 1.8.0_144
> Reporter: xuzq
> Assignee: xuzq
> Priority: Major
> Attachments: HDFS-14099-trunk-001.patch
>
>
> We need to use the ZSTD compression algorithm in Hadoop. So I write a simple
> demo like this for testing.
> {code:java}
> // code placeholder
> while ((size = fsDataInputStream.read(bufferV2)) > 0 ) {
> countSize += size;
> if (countSize == 65536 * 8) {
> if(!isFinished) {
> // finish a frame in zstd
> cmpOut.finish();
> isFinished = true;
> }
> fsDataOutputStream.flush();
> fsDataOutputStream.hflush();
> }
> if(isFinished) {
> LOG.info("Will resetState. N=" + n);
> // reset the stream and write again
> cmpOut.resetState();
> isFinished = false;
> }
> cmpOut.write(bufferV2, 0, size);
> bufferV2 = new byte[5 * 1024 * 1024];
> n++;
> }
> {code}
>
> And I use "*hadoop fs -text*" to read this file and failed. The error as
> blow.
> {code:java}
> Exception in thread "main" java.lang.InternalError: Unknown frame descriptor
> at
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
> Method)
> at
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
> at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
> at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127)
> at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
> at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
> at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
> at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303)
> at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285)
> at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269)
> at
> org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119)
> at org.apache.hadoop.fs.shell.Command.run(Command.java:176)
> at org.apache.hadoop.fs.FsShell.run(FsShell.java:328)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
> at org.apache.hadoop.fs.FsShell.main(FsShell.java:391)
> {code}
>
> So I had to look the code, include jni, then found this bug.
> *ZSTD_initDStream(stream)* method may by called twice in the same *Frame*.
> The first is in *ZStandardDecompressor.c.*
> {code:java}
> if (size == 0) {
> (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished,
> JNI_TRUE);
> size_t result = dlsym_ZSTD_initDStream(stream);
> if (dlsym_ZSTD_isError(result)) {
> THROW(env, "java/lang/InternalError",
> dlsym_ZSTD_getErrorName(result));
> return (jint) 0;
> }
> }
> {code}
> This call here is correct, but *Finished* no longer be set to false, even if
> there is some datas (a new frame) in *CompressedBuffer* or *UserBuffer* need
> to be decompressed.
> The second is in *org.apache.hadoop.io.compress.DecompressorStream* by
> *decompressor.reset()*, because *Finished* is always true after decompressed
> a *Frame*.
> {code:java}
> if (decompressor.finished()) {
> // First see if there was any leftover buffered input from previous
> // stream; if not, attempt to refill buffer. If refill -> EOF, we're
> // all done; else reset, fix up input buffer, and get ready for next
> // concatenated substream/"member".
> int nRemaining = decompressor.getRemaining();
> if (nRemaining == 0) {
> int m = getCompressedData();
> if (m == -1) {
> // apparently the previous end-of-stream was also end-of-file:
> // return success, as if we had never called getCompressedData()
> eof = true;
> return -1;
> }
> decompressor.reset();
> decompressor.setInput(buffer, 0, m);
> lastBytesSent = m;
> } else {
> // looks like it's a concatenated stream: reset low-level zlib (or
> // other engine) and buffers, then "resend" remaining input data
> decompressor.reset();
> int leftoverOffset = lastBytesSent - nRemaining;
> assert (leftoverOffset >= 0);
> // this recopies userBuf -> direct buffer if using native libraries:
> decompressor.setInput(buffer, leftoverOffset, nRemaining);
> // NOTE: this is the one place we do NOT want to save the number
> // of bytes sent (nRemaining here) into lastBytesSent: since we
> // are resending what we've already sent before, offset is nonzero
> // in general (only way it could be zero is if it already equals
> // nRemaining), which would then screw up the offset calculation
> // _next_ time around. IOW, getRemaining() is in terms of the
> // original, zero-offset bufferload, so lastBytesSent must be as
> // well. Cheesy ASCII art:
> //
> // <------------ m, lastBytesSent ----------->
> // +===============================================+
> // buffer: |1111111111|22222222222222222|333333333333| |
> // +===============================================+
> // #1: <-- off -->|<-------- nRemaining --------->
> // #2: <----------- off ----------->|<-- nRem. -->
> // #3: (final substream: nRemaining == 0; eof = true)
> //
> // If lastBytesSent is anything other than m, as shown, then "off"
> // will be calculated incorrectly.
> }
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]