[ https://issues.apache.org/jira/browse/HDFS-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923156#comment-17923156 ]
Sungwoo Park edited comment on HDFS-14099 at 2/3/25 2:38 AM: ------------------------------------------------------------- I still see this problem with Hadoop 3.3.6 when the input is a large file (e.g., 25MB) that does not compress well. (It seems I cannot attach a file larger than 20MB.) For Java library, I used zstd-jni-1.5.5-1.jar and Zstd library is version 1.5.5-1. zstd command successfully compresses and decompresses the sample file. When tested with a simple Java program, ZStandardCode fails to restore the original data. {code:java} void test(byte[] wrapperData, int wrapperSize) throws IOException { Configuration conf = new Configuration(); ZStandardCodec codec = new ZStandardCodec(); codec.setConf(conf); // Compress the string ByteArrayOutputStream compressedOut = new ByteArrayOutputStream(); try (OutputStream compressor = codec.createOutputStream(compressedOut)) { compressor.write(wrapperData, 0, wrapperSize); } byte[] compressedData = compressedOut.toByteArray(); LOG.error("xxxxx Compressed Data length: " + compressedData.length); // Decompress the string ByteArrayInputStream compressedIn = new ByteArrayInputStream(compressedData); ByteArrayOutputStream decompressedOut = new ByteArrayOutputStream(); try (InputStream decompressor = codec.createInputStream(compressedIn)) { byte[] buffer = new byte[1024]; int bytesRead; while ((bytesRead = decompressor.read(buffer)) != -1) { decompressedOut.write(buffer, 0, bytesRead); } } byte[] decompressedData = decompressedOut.toByteArray(); LOG.error("xxxxx Decompressed length: " + decompressedData.length); // Compare the original and decompressed data if (Arrays.equals(Arrays.copyOf(wrapperData, wrapperSize), decompressedData)) { LOG.info("Compression and decompression are successful. Data matches."); } else { LOG.error("Data mismatch! Compression or decompression failed."); } } {code} A few examples of calling test(): 2025-02-03 02:11:42,012 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 1009 2025-02-03 02:11:42,013 [DAG-1-5-1] INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.zst] 2025-02-03 02:11:42,013 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 1000 2025-02-03 02:11:42,013 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 10009 2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 10000 2025-02-03 02:11:42,029 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,063 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 100009 2025-02-03 02:11:42,064 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 100000 2025-02-03 02:11:42,064 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,142 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 1000051 2025-02-03 02:11:42,143 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 1000000 2025-02-03 02:11:42,144 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,144 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing pokemon.csv after loading byte[] 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 19011 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 45254 2025-02-03 02:11:42,155 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 2025-02-03 02:11:42,339 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing file.zst after loading byte[] 2025-02-03 02:11:42,460 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 26633605 2025-02-03 02:11:42,494 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 26632390 2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - Data mismatch! Compression or decompression failed. 2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing file.zst by reading directly 2025-02-03 02:11:42,501 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx7 error java.lang.InternalError: Unknown frame descriptor at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:188) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) ~[hadoop-common-3.3.6.jar:?] at java.io.InputStream.read(InputStream.java:220) ~[?:?] was (Author: glapark): I still see this problem with Hadoop 3.3.6 when the input is a large file (e.g., 25MB) that does not compress well. I have attached a sample input file for reproducing the problem. zstd command successfully compresses and decompresses the sample file. When tested with a simple Java program, ZStandardCode fails to restore the original data. {code:java} void test(byte[] wrapperData, int wrapperSize) throws IOException { Configuration conf = new Configuration(); ZStandardCodec codec = new ZStandardCodec(); codec.setConf(conf); // Compress the string ByteArrayOutputStream compressedOut = new ByteArrayOutputStream(); try (OutputStream compressor = codec.createOutputStream(compressedOut)) { compressor.write(wrapperData, 0, wrapperSize); } byte[] compressedData = compressedOut.toByteArray(); LOG.error("xxxxx Compressed Data length: " + compressedData.length); // Decompress the string ByteArrayInputStream compressedIn = new ByteArrayInputStream(compressedData); ByteArrayOutputStream decompressedOut = new ByteArrayOutputStream(); try (InputStream decompressor = codec.createInputStream(compressedIn)) { byte[] buffer = new byte[1024]; int bytesRead; while ((bytesRead = decompressor.read(buffer)) != -1) { decompressedOut.write(buffer, 0, bytesRead); } } byte[] decompressedData = decompressedOut.toByteArray(); LOG.error("xxxxx Decompressed length: " + decompressedData.length); // Compare the original and decompressed data if (Arrays.equals(Arrays.copyOf(wrapperData, wrapperSize), decompressedData)) { LOG.info("Compression and decompression are successful. Data matches."); } else { LOG.error("Data mismatch! Compression or decompression failed."); } } {code} A few examples of calling test(): 2025-02-03 02:11:42,012 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 1009 2025-02-03 02:11:42,013 [DAG-1-5-1] INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.zst] 2025-02-03 02:11:42,013 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 1000 2025-02-03 02:11:42,013 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 10009 2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 10000 2025-02-03 02:11:42,029 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,063 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 100009 2025-02-03 02:11:42,064 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 100000 2025-02-03 02:11:42,064 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,142 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 1000051 2025-02-03 02:11:42,143 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 1000000 2025-02-03 02:11:42,144 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,144 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing pokemon.csv after loading byte[] 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 19011 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 45254 2025-02-03 02:11:42,155 [DAG-1-5-1] INFO org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and decompression are successful. Data matches. 2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 2025-02-03 02:11:42,339 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing file.zst after loading byte[] 2025-02-03 02:11:42,460 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed Data length: 26633605 2025-02-03 02:11:42,494 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed length: 26632390 2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - Data mismatch! Compression or decompression failed. 2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing file.zst by reading directly 2025-02-03 02:11:42,501 [DAG-1-5-1] ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx7 error java.lang.InternalError: Unknown frame descriptor at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:188) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) ~[hadoop-common-3.3.6.jar:?] at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) ~[hadoop-common-3.3.6.jar:?] at java.io.InputStream.read(InputStream.java:220) ~[?:?] > Unknown frame descriptor when decompressing multiple frames in > ZStandardDecompressor > ------------------------------------------------------------------------------------ > > Key: HDFS-14099 > URL: https://issues.apache.org/jira/browse/HDFS-14099 > Project: Hadoop HDFS > Issue Type: Bug > Components: compress, io > Affects Versions: 3.4.0, 3.2.3, 3.3.2 > Environment: Hadoop Version: hadoop-3.0.3 > Java Version: 1.8.0_144 > Reporter: ZanderXu > Assignee: ZanderXu > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0, 3.2.3, 3.3.2 > > Attachments: HDFS-14099-trunk-001.patch, HDFS-14099-trunk-002.patch, > HDFS-14099-trunk-003.patch > > Time Spent: 1h 10m > Remaining Estimate: 0h > > We need to use the ZSTD compression algorithm in Hadoop. So I write a simple > demo like this for testing. > {code:java} > // code placeholder > while ((size = fsDataInputStream.read(bufferV2)) > 0 ) { > countSize += size; > if (countSize == 65536 * 8) { > if(!isFinished) { > // finish a frame in zstd > cmpOut.finish(); > isFinished = true; > } > fsDataOutputStream.flush(); > fsDataOutputStream.hflush(); > } > if(isFinished) { > LOG.info("Will resetState. N=" + n); > // reset the stream and write again > cmpOut.resetState(); > isFinished = false; > } > cmpOut.write(bufferV2, 0, size); > bufferV2 = new byte[5 * 1024 * 1024]; > n++; > } > {code} > > And I use "*hadoop fs -text*" to read this file and failed. The error as > blow. > {code:java} > Exception in thread "main" java.lang.InternalError: Unknown frame descriptor > at > org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native > Method) > at > org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181) > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98) > at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66) > at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127) > at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101) > at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96) > at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331) > at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303) > at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285) > at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269) > at > org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119) > at org.apache.hadoop.fs.shell.Command.run(Command.java:176) > at org.apache.hadoop.fs.FsShell.run(FsShell.java:328) > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90) > at org.apache.hadoop.fs.FsShell.main(FsShell.java:391) > {code} > > So I had to look the code, include jni, then found this bug. > *ZSTD_initDStream(stream)* method may by called twice in the same *Frame*. > The first is in *ZStandardDecompressor.c.* > {code:java} > if (size == 0) { > (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, > JNI_TRUE); > size_t result = dlsym_ZSTD_initDStream(stream); > if (dlsym_ZSTD_isError(result)) { > THROW(env, "java/lang/InternalError", > dlsym_ZSTD_getErrorName(result)); > return (jint) 0; > } > } > {code} > This call here is correct, but *Finished* no longer be set to false, even if > there is some datas (a new frame) in *CompressedBuffer* or *UserBuffer* need > to be decompressed. > The second is in *org.apache.hadoop.io.compress.DecompressorStream* by > *decompressor.reset()*, because *Finished* is always true after decompressed > a *Frame*. > {code:java} > if (decompressor.finished()) { > // First see if there was any leftover buffered input from previous > // stream; if not, attempt to refill buffer. If refill -> EOF, we're > // all done; else reset, fix up input buffer, and get ready for next > // concatenated substream/"member". > int nRemaining = decompressor.getRemaining(); > if (nRemaining == 0) { > int m = getCompressedData(); > if (m == -1) { > // apparently the previous end-of-stream was also end-of-file: > // return success, as if we had never called getCompressedData() > eof = true; > return -1; > } > decompressor.reset(); > decompressor.setInput(buffer, 0, m); > lastBytesSent = m; > } else { > // looks like it's a concatenated stream: reset low-level zlib (or > // other engine) and buffers, then "resend" remaining input data > decompressor.reset(); > int leftoverOffset = lastBytesSent - nRemaining; > assert (leftoverOffset >= 0); > // this recopies userBuf -> direct buffer if using native libraries: > decompressor.setInput(buffer, leftoverOffset, nRemaining); > // NOTE: this is the one place we do NOT want to save the number > // of bytes sent (nRemaining here) into lastBytesSent: since we > // are resending what we've already sent before, offset is nonzero > // in general (only way it could be zero is if it already equals > // nRemaining), which would then screw up the offset calculation > // _next_ time around. IOW, getRemaining() is in terms of the > // original, zero-offset bufferload, so lastBytesSent must be as > // well. Cheesy ASCII art: > // > // <------------ m, lastBytesSent -----------> > // +===============================================+ > // buffer: |1111111111|22222222222222222|333333333333| | > // +===============================================+ > // #1: <-- off -->|<-------- nRemaining ---------> > // #2: <----------- off ----------->|<-- nRem. --> > // #3: (final substream: nRemaining == 0; eof = true) > // > // If lastBytesSent is anything other than m, as shown, then "off" > // will be calculated incorrectly. > } > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org