[ 
https://issues.apache.org/jira/browse/HDFS-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923156#comment-17923156
 ] 

Sungwoo Park edited comment on HDFS-14099 at 2/3/25 2:38 AM:
-------------------------------------------------------------

I still see this problem with Hadoop 3.3.6 when the input is a large file 
(e.g., 25MB) that does not compress well. (It seems I cannot attach a file 
larger than 20MB.)

For Java library, I used zstd-jni-1.5.5-1.jar and Zstd library is version 
1.5.5-1.

zstd command successfully compresses and decompresses the sample file.

When tested with a simple Java program, ZStandardCode fails to restore the 
original data.


{code:java}
 void test(byte[] wrapperData, int wrapperSize) throws IOException {
        Configuration conf = new Configuration();
        ZStandardCodec codec = new ZStandardCodec();
        codec.setConf(conf);

        // Compress the string
        ByteArrayOutputStream compressedOut = new ByteArrayOutputStream();
        try (OutputStream compressor = codec.createOutputStream(compressedOut)) 
{
            compressor.write(wrapperData, 0, wrapperSize);
        }
        byte[] compressedData = compressedOut.toByteArray();
        LOG.error("xxxxx Compressed Data length: " + compressedData.length);

        // Decompress the string
        ByteArrayInputStream compressedIn = new 
ByteArrayInputStream(compressedData);
        ByteArrayOutputStream decompressedOut = new ByteArrayOutputStream();
        try (InputStream decompressor = codec.createInputStream(compressedIn)) {
            byte[] buffer = new byte[1024];
            int bytesRead;
            while ((bytesRead = decompressor.read(buffer)) != -1) {
                decompressedOut.write(buffer, 0, bytesRead);
            }
        }
        byte[] decompressedData = decompressedOut.toByteArray();
        LOG.error("xxxxx Decompressed length: " + decompressedData.length);

        // Compare the original and decompressed data
      if (Arrays.equals(Arrays.copyOf(wrapperData, wrapperSize), 
decompressedData)) {
          LOG.info("Compression and decompression are successful. Data 
matches.");
      } else {
          LOG.error("Data mismatch! Compression or decompression failed.");
      }
    }

{code}

A few examples of calling test():

2025-02-03 02:11:42,012 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 1009
2025-02-03 02:11:42,013 [DAG-1-5-1] INFO  
org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.zst]
2025-02-03 02:11:42,013 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 1000
2025-02-03 02:11:42,013 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 10009
2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 10000
2025-02-03 02:11:42,029 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,063 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 100009
2025-02-03 02:11:42,064 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 100000
2025-02-03 02:11:42,064 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,142 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 1000051
2025-02-03 02:11:42,143 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 1000000
2025-02-03 02:11:42,144 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,144 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
pokemon.csv after loading byte[]
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 19011
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 45254
2025-02-03 02:11:42,155 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 2025-02-03 02:11:42,339 [DAG-1-5-1] 
ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
file.zst after loading byte[]
2025-02-03 02:11:42,460 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 26633605
2025-02-03 02:11:42,494 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 26632390
2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Data mismatch! 
Compression or decompression failed.
2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
file.zst by reading directly
2025-02-03 02:11:42,501 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx7 error
java.lang.InternalError: Unknown frame descriptor
        at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
 Method) ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:188)
 ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
 ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
 ~[hadoop-common-3.3.6.jar:?]
        at java.io.InputStream.read(InputStream.java:220) ~[?:?]


was (Author: glapark):
I still see this problem with Hadoop 3.3.6 when the input is a large file 
(e.g., 25MB) that does not compress well. I have attached a sample input file 
for reproducing the problem.

zstd command successfully compresses and decompresses the sample file.

When tested with a simple Java program, ZStandardCode fails to restore the 
original data.


{code:java}
 void test(byte[] wrapperData, int wrapperSize) throws IOException {
        Configuration conf = new Configuration();
        ZStandardCodec codec = new ZStandardCodec();
        codec.setConf(conf);

        // Compress the string
        ByteArrayOutputStream compressedOut = new ByteArrayOutputStream();
        try (OutputStream compressor = codec.createOutputStream(compressedOut)) 
{
            compressor.write(wrapperData, 0, wrapperSize);
        }
        byte[] compressedData = compressedOut.toByteArray();
        LOG.error("xxxxx Compressed Data length: " + compressedData.length);

        // Decompress the string
        ByteArrayInputStream compressedIn = new 
ByteArrayInputStream(compressedData);
        ByteArrayOutputStream decompressedOut = new ByteArrayOutputStream();
        try (InputStream decompressor = codec.createInputStream(compressedIn)) {
            byte[] buffer = new byte[1024];
            int bytesRead;
            while ((bytesRead = decompressor.read(buffer)) != -1) {
                decompressedOut.write(buffer, 0, bytesRead);
            }
        }
        byte[] decompressedData = decompressedOut.toByteArray();
        LOG.error("xxxxx Decompressed length: " + decompressedData.length);

        // Compare the original and decompressed data
      if (Arrays.equals(Arrays.copyOf(wrapperData, wrapperSize), 
decompressedData)) {
          LOG.info("Compression and decompression are successful. Data 
matches.");
      } else {
          LOG.error("Data mismatch! Compression or decompression failed.");
      }
    }

{code}

A few examples of calling test():

2025-02-03 02:11:42,012 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 1009
2025-02-03 02:11:42,013 [DAG-1-5-1] INFO  
org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.zst]
2025-02-03 02:11:42,013 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 1000
2025-02-03 02:11:42,013 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 10009
2025-02-03 02:11:42,029 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 10000
2025-02-03 02:11:42,029 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,063 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 100009
2025-02-03 02:11:42,064 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 100000
2025-02-03 02:11:42,064 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,142 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 1000051
2025-02-03 02:11:42,143 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 1000000
2025-02-03 02:11:42,144 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,144 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
pokemon.csv after loading byte[]
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 19011
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 45254
2025-02-03 02:11:42,155 [DAG-1-5-1] INFO  
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Compression and 
decompression are successful. Data matches.
2025-02-03 02:11:42,155 [DAG-1-5-1] ERROR 2025-02-03 02:11:42,339 [DAG-1-5-1] 
ERROR org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
file.zst after loading byte[]
2025-02-03 02:11:42,460 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Compressed 
Data length: 26633605
2025-02-03 02:11:42,494 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxxx Decompressed 
length: 26632390
2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - Data mismatch! 
Compression or decompression failed.
2025-02-03 02:11:42,500 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx testing 
file.zst by reading directly
2025-02-03 02:11:42,501 [DAG-1-5-1] ERROR 
org.apache.tez.runtime.library.common.sort.impl.IFile [] - xxxx7 error
java.lang.InternalError: Unknown frame descriptor
        at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
 Method) ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:188)
 ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
 ~[hadoop-common-3.3.6.jar:?]
        at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
 ~[hadoop-common-3.3.6.jar:?]
        at java.io.InputStream.read(InputStream.java:220) ~[?:?]

> Unknown frame descriptor when decompressing multiple frames in 
> ZStandardDecompressor
> ------------------------------------------------------------------------------------
>
>                 Key: HDFS-14099
>                 URL: https://issues.apache.org/jira/browse/HDFS-14099
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: compress, io
>    Affects Versions: 3.4.0, 3.2.3, 3.3.2
>         Environment: Hadoop Version: hadoop-3.0.3
> Java Version: 1.8.0_144
>            Reporter: ZanderXu
>            Assignee: ZanderXu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0, 3.2.3, 3.3.2
>
>         Attachments: HDFS-14099-trunk-001.patch, HDFS-14099-trunk-002.patch, 
> HDFS-14099-trunk-003.patch
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> We need to use the ZSTD compression algorithm in Hadoop. So I write a simple 
> demo like this for testing.
> {code:java}
> // code placeholder
> while ((size = fsDataInputStream.read(bufferV2)) > 0 ) {
>   countSize += size;
>   if (countSize == 65536 * 8) {
>     if(!isFinished) {
>       // finish a frame in zstd
>       cmpOut.finish();
>       isFinished = true;
>     }
>     fsDataOutputStream.flush();
>     fsDataOutputStream.hflush();
>   }
>   if(isFinished) {
>     LOG.info("Will resetState. N=" + n);
>     // reset the stream and write again
>     cmpOut.resetState();
>     isFinished = false;
>   }
>   cmpOut.write(bufferV2, 0, size);
>   bufferV2 = new byte[5 * 1024 * 1024];
>   n++;
> }
> {code}
>  
> And I use "*hadoop fs -text*"  to read this file and failed. The error as 
> blow.
> {code:java}
> Exception in thread "main" java.lang.InternalError: Unknown frame descriptor
> at 
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
>  Method)
> at 
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
> at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
> at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127)
> at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
> at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
> at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
> at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303)
> at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285)
> at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269)
> at 
> org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119)
> at org.apache.hadoop.fs.shell.Command.run(Command.java:176)
> at org.apache.hadoop.fs.FsShell.run(FsShell.java:328)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
> at org.apache.hadoop.fs.FsShell.main(FsShell.java:391)
> {code}
>  
> So I had to look the code, include jni, then found this bug.
> *ZSTD_initDStream(stream)* method may by called twice in the same *Frame*.
> The first is  in *ZStandardDecompressor.c.* 
> {code:java}
> if (size == 0) {
>     (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, 
> JNI_TRUE);
>     size_t result = dlsym_ZSTD_initDStream(stream);
>     if (dlsym_ZSTD_isError(result)) {
>         THROW(env, "java/lang/InternalError", 
> dlsym_ZSTD_getErrorName(result));
>         return (jint) 0;
>     }
> }
> {code}
> This call here is correct, but *Finished* no longer be set to false, even if 
> there is some datas (a new frame) in *CompressedBuffer* or *UserBuffer* need 
> to be decompressed.
> The second is in *org.apache.hadoop.io.compress.DecompressorStream* by 
> *decompressor.reset()*, because *Finished* is always true after decompressed 
> a *Frame*.
> {code:java}
> if (decompressor.finished()) {
>   // First see if there was any leftover buffered input from previous
>   // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
>   // all done; else reset, fix up input buffer, and get ready for next
>   // concatenated substream/"member".
>   int nRemaining = decompressor.getRemaining();
>   if (nRemaining == 0) {
>     int m = getCompressedData();
>     if (m == -1) {
>       // apparently the previous end-of-stream was also end-of-file:
>       // return success, as if we had never called getCompressedData()
>       eof = true;
>       return -1;
>     }
>     decompressor.reset();
>     decompressor.setInput(buffer, 0, m);
>     lastBytesSent = m;
>   } else {
>     // looks like it's a concatenated stream:  reset low-level zlib (or
>     // other engine) and buffers, then "resend" remaining input data
>     decompressor.reset();
>     int leftoverOffset = lastBytesSent - nRemaining;
>     assert (leftoverOffset >= 0);
>     // this recopies userBuf -> direct buffer if using native libraries:
>     decompressor.setInput(buffer, leftoverOffset, nRemaining);
>     // NOTE:  this is the one place we do NOT want to save the number
>     // of bytes sent (nRemaining here) into lastBytesSent:  since we
>     // are resending what we've already sent before, offset is nonzero
>     // in general (only way it could be zero is if it already equals
>     // nRemaining), which would then screw up the offset calculation
>     // _next_ time around.  IOW, getRemaining() is in terms of the
>     // original, zero-offset bufferload, so lastBytesSent must be as
>     // well.  Cheesy ASCII art:
>     //
>     //          <------------ m, lastBytesSent ----------->
>     //          +===============================================+
>     // buffer:  |1111111111|22222222222222222|333333333333|     |
>     //          +===============================================+
>     //     #1:  <-- off -->|<-------- nRemaining --------->
>     //     #2:  <----------- off ----------->|<-- nRem. -->
>     //     #3:  (final substream:  nRemaining == 0; eof = true)
>     //
>     // If lastBytesSent is anything other than m, as shown, then "off"
>     // will be calculated incorrectly.
>   }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to