[
https://issues.apache.org/jira/browse/PARQUET-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610209#comment-17610209
]
ASF GitHub Bot commented on PARQUET-2196:
-----------------------------------------
shangxinli commented on code in PR #1000:
URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r981648516
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+abstract public class NonBlockedDecompressor implements Decompressor {
+
+ // Buffer for uncompressed output. This buffer grows as necessary.
+ private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+
+ // Buffer for compressed input. This buffer grows as necessary.
+ private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+
+ private boolean finished;
+
+ /**
+ * Fills specified buffer with uncompressed data. Returns actual number
+ * of bytes of uncompressed data. A return value of 0 indicates that
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
+ *
+ * @param buffer Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of uncompressed data.
+ * @throws IOException if reading or decompression fails
+ */
+ @Override
+ public synchronized int decompress(byte[] buffer, int off, int len) throws
IOException {
+ SnappyUtil.validateBuffer(buffer, off, len);
+ if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
+ return 0;
+ }
+
+ if (!outputBuffer.hasRemaining()) {
+ inputBuffer.rewind();
+ Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid
position of 0.");
+ Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid
position of 0.");
+ // There is compressed input, decompress it now.
+ int decompressedSize = uncompressedLength(inputBuffer, len);
+ if (decompressedSize > outputBuffer.capacity()) {
+ ByteBuffer oldBuffer = outputBuffer;
+ outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
+ CleanUtil.cleanDirectBuffer(oldBuffer);
+ }
+
+ // Reset the previous outputBuffer (i.e. set position to 0)
+ outputBuffer.clear();
+ int size = uncompress(inputBuffer, outputBuffer);
+ outputBuffer.limit(size);
+ // We've decompressed the entire input, reset the input now
+ inputBuffer.clear();
+ inputBuffer.limit(0);
+ finished = true;
+ }
+
+ // Return compressed output up to 'len'
+ int numBytes = Math.min(len, outputBuffer.remaining());
+ outputBuffer.get(buffer, off, numBytes);
+ return numBytes;
+ }
+
+ /**
+ * Sets input data for decompression.
+ * This should be called if and only if {@link #needsInput()} returns
+ * <code>true</code> indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via <code>b[]</code> remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
+ *
+ * @param buffer Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] buffer, int off, int len) {
+ SnappyUtil.validateBuffer(buffer, off, len);
Review Comment:
Should we refactor this SnappyUtil also?
> Support LZ4_RAW codec
> ---------------------
>
> Key: PARQUET-2196
> URL: https://issues.apache.org/jira/browse/PARQUET-2196
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Reporter: Gang Wu
> Priority: Major
>
> There is a long history about the LZ4 interoperability of parquet files
> between parquet-mr and parquet-cpp (which is now in the Apache Arrow).
> Attached links are the evidence. In short, a new LZ4_RAW codec type has been
> introduced since parquet format v2.9.0. However, only parquet-cpp supports
> LZ4_RAW. The parquet-mr library still uses the old Hadoop-provided LZ4 codec
> and cannot read parquet files with LZ4_RAW.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)