[ 
https://issues.apache.org/jira/browse/PARQUET-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610209#comment-17610209
 ] 

ASF GitHub Bot commented on PARQUET-2196:
-----------------------------------------

shangxinli commented on code in PR #1000:
URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r981648516


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+abstract public class NonBlockedDecompressor implements Decompressor {
+
+  // Buffer for uncompressed output. This buffer grows as necessary.
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+
+  // Buffer for compressed input. This buffer grows as necessary.
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+
+  private boolean finished;
+
+  /**
+   * Fills specified buffer with uncompressed data. Returns actual number
+   * of bytes of uncompressed data. A return value of 0 indicates that
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
+   *
+   * @param buffer Buffer for the compressed data
+   * @param off    Start offset of the data
+   * @param len    Size of the buffer
+   * @return The actual number of bytes of uncompressed data.
+   * @throws IOException if reading or decompression fails
+   */
+  @Override
+  public synchronized int decompress(byte[] buffer, int off, int len) throws 
IOException {
+    SnappyUtil.validateBuffer(buffer, off, len);
+    if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
+      return 0;
+    }
+
+    if (!outputBuffer.hasRemaining()) {
+      inputBuffer.rewind();
+      Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid 
position of 0.");
+      Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid 
position of 0.");
+      // There is compressed input, decompress it now.
+      int decompressedSize = uncompressedLength(inputBuffer, len);
+      if (decompressedSize > outputBuffer.capacity()) {
+        ByteBuffer oldBuffer = outputBuffer;
+        outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
+        CleanUtil.cleanDirectBuffer(oldBuffer);
+      }
+
+      // Reset the previous outputBuffer (i.e. set position to 0)
+      outputBuffer.clear();
+      int size = uncompress(inputBuffer, outputBuffer);
+      outputBuffer.limit(size);
+      // We've decompressed the entire input, reset the input now
+      inputBuffer.clear();
+      inputBuffer.limit(0);
+      finished = true;
+    }
+
+    // Return compressed output up to 'len'
+    int numBytes = Math.min(len, outputBuffer.remaining());
+    outputBuffer.get(buffer, off, numBytes);
+    return numBytes;
+  }
+
+  /**
+   * Sets input data for decompression.
+   * This should be called if and only if {@link #needsInput()} returns
+   * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
+   *
+   * @param buffer Input data
+   * @param off    Start offset
+   * @param len    Length
+   */
+  @Override
+  public synchronized void setInput(byte[] buffer, int off, int len) {
+    SnappyUtil.validateBuffer(buffer, off, len);

Review Comment:
   Should we refactor this SnappyUtil also? 





> Support LZ4_RAW codec
> ---------------------
>
>                 Key: PARQUET-2196
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2196
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Gang Wu
>            Priority: Major
>
> There is a long history about the LZ4 interoperability of parquet files 
> between parquet-mr and parquet-cpp (which is now in the Apache Arrow). 
> Attached links are the evidence. In short, a new LZ4_RAW codec type has been 
> introduced since parquet format v2.9.0. However, only parquet-cpp supports 
> LZ4_RAW. The parquet-mr library still uses the old Hadoop-provided LZ4 codec 
> and cannot read parquet files with LZ4_RAW.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to