IMPALA-3079: Fix sequence file writer This change fixes the following issues in the Sequence File Writer: 1. ReadWriteUtil::VLongRequiredBytes() and ReadWriteUtil::PutVLong() were broken. As a result, Impala created corrupt uncompressed sequence files.
2. KEY_CLASS_NAME was missing from the sequence file header. As a result, Hive could not read back uncompressed sequence files created by Impala. 3. Impala created record-compressed sequence files with empty keys block. As a result, Hive could not read back record-compressed sequence files created by Impala. 4. Impala created block-compressed files with: - empty key-lengths block - empty keys block - empty value-lengths block This resulted in invalid block-compressed sequence files that Hive could not read back. 5. In some cases the wrong Record-compression flag was written to the sequence file header. As a result, Hive could not read back record- compressed sequence files created by Impala. 6. Impala added 'sync_marker' instead of 'neg1_sync_marker' to the beginning of blocks in block-compressed sequence files. Hive could not read these files back. 7. The calculation of block sizes in SnappyBlockCompressor class was incorrect for odd-length buffers. Change-Id: I0db642ad35132a9a5a6611810a6cafbbe26e7487 Reviewed-on: http://gerrit.cloudera.org:8080/6107 Reviewed-by: Michael Ho <[email protected]> Reviewed-by: Attila Jeges <[email protected]> Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59b2db6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59b2db6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59b2db6b Branch: refs/heads/master Commit: 59b2db6ba722e5bef297bb4603519e06333ce5cb Parents: edcc593 Author: Attila Jeges <[email protected]> Authored: Mon Feb 20 18:07:25 2017 +0100 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Apr 25 21:07:53 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-sequence-table-writer.cc | 105 +++++++--- be/src/exec/hdfs-sequence-table-writer.h | 71 ++++++- be/src/exec/read-write-util-test.cc | 57 ++++++ be/src/exec/read-write-util.h | 64 +++++-- be/src/util/compress.cc | 35 ++-- be/src/util/decompress-test.cc | 9 +- .../queries/QueryTest/seq-writer.test | 192 ++++++++++++++++++- tests/query_test/test_compressed_formats.py | 39 +++- 8 files changed, 495 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc index 3c522ba..9af7e0f 100644 --- a/be/src/exec/hdfs-sequence-table-writer.cc +++ b/be/src/exec/hdfs-sequence-table-writer.cc @@ -38,8 +38,10 @@ namespace impala { -uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6}; +const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6}; const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text"; +const char* HdfsSequenceTableWriter::KEY_CLASS_NAME = + "org.apache.hadoop.io.BytesWritable"; HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output, @@ -130,24 +132,25 @@ Status HdfsSequenceTableWriter::AppendRows( } Status HdfsSequenceTableWriter::WriteFileHeader() { - out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE)); + out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE); - // Output an empty KeyClassName field - out_.WriteEmptyText(); + // Setup to be correct key class + out_.WriteText(strlen(KEY_CLASS_NAME), + reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME)); // Setup to be correct value class out_.WriteText(strlen(VALUE_CLASS_NAME), - reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME)); + reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME)); // Flag for if compression is used out_.WriteBoolean(compress_flag_); // Only valid if compression is used. Indicates if block compression is used. - out_.WriteBoolean(!record_compression_); + out_.WriteBoolean(compress_flag_ && !record_compression_); // Output the name of our compression codec, parsed by readers if (compress_flag_) { out_.WriteText(codec_name_.size(), - reinterpret_cast<const uint8_t*>(codec_name_.data())); + reinterpret_cast<const uint8_t*>(codec_name_.data())); } // Meta data is formated as an integer N followed by N*2 strings, @@ -164,35 +167,63 @@ Status HdfsSequenceTableWriter::WriteFileHeader() { } Status HdfsSequenceTableWriter::WriteCompressedBlock() { - WriteStream header; + WriteStream record; + uint8_t *output; + int64_t output_length; DCHECK(compress_flag_); - // add a sync marker to start of the block - header.WriteBytes(sync_marker_.size(), sync_marker_.data()); + // Add a sync marker to start of the block + record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data()); - header.WriteVLong(unflushed_rows_); + // Output the number of rows in this block + record.WriteVLong(unflushed_rows_); - // Write Key Lengths and Key Values - header.WriteEmptyText(); - header.WriteEmptyText(); + // Output compressed key-lengths block-size & compressed key-lengths block. + // The key-lengths block contains byte value of 4 as a key length for each row (this is + // what Hive does). + string key_lengths_text(unflushed_rows_, '\x04'); + { + SCOPED_TIMER(parent_->compress_timer()); + RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(), + reinterpret_cast<uint8_t*>(&key_lengths_text[0]), &output_length, &output)); + } + record.WriteVInt(output_length); + record.WriteBytes(output_length, output); - // Output an Empty string for value Lengths - header.WriteEmptyText(); + // Output compressed keys block-size & compressed keys block. + // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what + // Hive does). + string keys_text(unflushed_rows_ * 4, '\0'); + { + SCOPED_TIMER(parent_->compress_timer()); + RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(), + reinterpret_cast<uint8_t*>(&keys_text[0]), &output_length, &output)); + } + record.WriteVInt(output_length); + record.WriteBytes(output_length, output); - uint8_t *output; - int64_t output_length; + // Output compressed value-lengths block-size & compressed value-lengths block + string value_lengths_text = out_value_lengths_block_.String(); + { + SCOPED_TIMER(parent_->compress_timer()); + RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(), + reinterpret_cast<uint8_t*>(&value_lengths_text[0]), &output_length, &output)); + } + record.WriteVInt(output_length); + record.WriteBytes(output_length, output); + + // Output compressed values block-size & compressed values block string text = out_.String(); { SCOPED_TIMER(parent_->compress_timer()); RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(), reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output)); } + record.WriteVInt(output_length); + record.WriteBytes(output_length, output); - header.WriteVInt(output_length); - string head = header.String(); - RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()), - head.size())); - RETURN_IF_ERROR(Write(output, output_length)); + string rec = record.String(); + RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size())); return Status::OK(); } @@ -237,11 +268,15 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) { ++unflushed_rows_; row_buf_.Clear(); if (compress_flag_ && !record_compression_) { - // Output row for a block compressed sequence file - // write the length as a vlong and then write the contents + // Output row for a block compressed sequence file. + // Value block: Write the length as a vlong and then write the contents. EncodeRow(row, &row_buf_); out_.WriteVLong(row_buf_.Size()); out_.WriteBytes(row_buf_.Size(), row_buf_.String().data()); + // Value-lengths block: Write the number of bytes we have just written to out_ as + // vlong + out_value_lengths_block_.WriteVLong( + ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size()); return Status::OK(); } @@ -249,7 +284,7 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) { const uint8_t* value_bytes; int64_t value_length; - if (record_compression_) { + if (compress_flag_) { // apply compression to row_buf_ // the length of the buffer must be prefixed to the buffer prior to compression // @@ -275,16 +310,22 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) { int rec_len = value_length; // if the record is compressed, the length is part of the compressed text // if not, then we need to write the length (below) and account for it's size - if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length); + if (!compress_flag_) { + rec_len += ReadWriteUtil::VLongRequiredBytes(value_length); + } + // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence + // as a key just like Hive). + rec_len += 4; - // Length of the record (incl. key length and value length) + // Length of the record (incl. key and value length) out_.WriteInt(rec_len); - // Write length of the key (Impala/Hive doesn't write a key) - out_.WriteInt(0); + // Write length of the key and the key + out_.WriteInt(4); + out_.WriteBytes(4, "\0\0\0\0"); // if the record is compressed, the length is part of the compressed text - if (!record_compression_) out_.WriteVLong(value_length); + if (!compress_flag_) out_.WriteVLong(value_length); // write out the value (possibly compressed) out_.WriteBytes(value_length, value_bytes); @@ -304,6 +345,8 @@ Status HdfsSequenceTableWriter::Flush() { Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size())); } out_.Clear(); + out_value_lengths_block_.Clear(); + mem_pool_->FreeAll(); unflushed_rows_ = 0; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h index 7f6a888..3d72926 100644 --- a/be/src/exec/hdfs-sequence-table-writer.h +++ b/be/src/exec/hdfs-sequence-table-writer.h @@ -36,6 +36,66 @@ class RuntimeState; struct StringValue; struct OutputPartition; +/// Sequence files are flat files consisting of binary key/value pairs. Essentially there +/// are 3 different formats for sequence files depending on the 'compression_codec' and +/// 'seq_compression_mode' query options: +/// - Uncompressed sequence file format +/// - Record-compressed sequence file format +/// - Block-compressed sequence file format +/// All of them share a common header described below. +/// +/// Sequence File Header +/// -------------------- +/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number +/// (e.g. SEQ4 or SEQ6) +/// - keyClassName - key class +/// - valueClassName - value class +/// - compression - A boolean which specifies if compression is turned on for keys/values +/// in this file. +/// - blockCompression - A boolean which specifies if block-compression is turned on for +/// keys/values in this file. +/// - compression codec - compression codec class which is used for compression of keys +/// and/or values (if compression is enabled). +/// - metadata - SequenceFile.Metadata for this file. +/// - sync - A 16 byte sync marker to denote end of the header. +/// +/// Uncompressed Sequence File Format +/// --------------------------------- +/// - Header +/// - Record +/// - Record length +/// - Key length +/// - Key +/// - Value +/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so. +/// +/// Record-Compressed Sequence File Format +/// -------------------------------------- +/// - Header +/// - Record +/// - Record length +/// - Key length +/// - Key +/// - Compressed Value +/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so. +/// +/// Block-Compressed Sequence File Format +/// ------------------------------------- +/// - Header +/// - Record Block +/// - Uncompressed number of records in the block +/// - Compressed key-lengths block-size +/// - Compressed key-lengths block +/// - Compressed keys block-size +/// - Compressed keys block +/// - Compressed value-lengths block-size +/// - Compressed value-lengths block +/// - Compressed values block-size +/// - Compressed values block +/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block. +/// The compressed blocks of key lengths and value lengths consist of the actual lengths +/// of individual keys/values encoded in zero-compressed integer format. + /// Consumes rows and outputs the rows into a sequence file in HDFS /// Output is buffered to fill sequence file blocks. class HdfsSequenceTableWriter : public HdfsTableWriter { @@ -67,7 +127,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter { /// writes the SEQ file header to HDFS Status WriteFileHeader(); - /// writes the contents of out_ as a single compressed block + /// writes the contents of out_value_lengths_block_ and out_ as a single + /// block-compressed record. Status WriteCompressedBlock(); /// writes the tuple row to the given buffer; separates fields by field_delim_, @@ -88,6 +149,10 @@ class HdfsSequenceTableWriter : public HdfsTableWriter { /// buffer which holds accumulated output WriteStream out_; + /// buffer which holds accumulated value-lengths output (used with block-compressed + /// sequence files) + WriteStream out_value_lengths_block_; + /// Temporary Buffer for a single row WriteStream row_buf_; @@ -119,10 +184,12 @@ class HdfsSequenceTableWriter : public HdfsTableWriter { /// A -1 infront of the sync marker, used in decompressed formats std::string neg1_sync_marker_; + /// Name of java class to use when reading the keys + static const char* KEY_CLASS_NAME; /// Name of java class to use when reading the values static const char* VALUE_CLASS_NAME; /// Magic characters used to identify the file type - static uint8_t SEQ6_CODE[4]; + static const uint8_t SEQ6_CODE[4]; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc index 0f9c6ae..e4448de 100644 --- a/be/src/exec/read-write-util-test.cc +++ b/be/src/exec/read-write-util-test.cc @@ -56,6 +56,63 @@ TEST(ReadWriteUtil, BigEndian) { TestBigEndian<uint64_t>(0xffffffffffffff); } +TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) { + // Small longs stored in 1 byte + for (int64_t val = -112; val <= 127; val++) { + EXPECT_EQ(1, ReadWriteUtil::VLongRequiredBytes(val)); + } + // Small longs stored in 2 bytes + for (int64_t val = -128; val < -112; val++) { + EXPECT_EQ(2, ReadWriteUtil::VLongRequiredBytes(val)); + } + // Positive longs stored in 3-9 bytes + int64_t val = 0x7000ab00cd00ef00; + for (int sh = 0; sh <= 6; sh++) { + EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val)); + val = val >> 8; + } + // Negative longs stored 3-9 bytes + val = 0x8000ab00cd00ef00; + for (int sh = 0; sh <= 6; sh++) { + EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val)); + val = val >> 8; + } + //Max/min long is stored in 9 bytes + EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x7fffffffffffffff)); + EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x8000000000000000)); +} + +void TestPutGetZeroCompressedLong(int64_t val) { + uint8_t buffer[9]; + int64_t read_val; + int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer); + int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val); + EXPECT_EQ(read_bytes, num_bytes); + EXPECT_EQ(read_val, val); +} + +TEST(ReadWriteUtil, ZeroCompressedLong) { + //1 byte longs + for (int64_t val = -128; val <= 127; val++) { + TestPutGetZeroCompressedLong(val); + } + //2+ byte positive longs + int64_t val = 0x70100000200000ab; + for (int sh = 0; sh <= 6; sh++) { + TestPutGetZeroCompressedLong(val); + val = val >> 8; + } + //2+ byte negative longs + val = 0x80100000200000ab; + for (int sh = 0; sh <= 6; sh++) { + TestPutGetZeroCompressedLong(val); + val = val >> 8; + } + //Max/min long + TestPutGetZeroCompressedLong(0x7fffffffffffffff); + TestPutGetZeroCompressedLong(0x8000000000000000); +} + } IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h index c384839..84d41dd 100644 --- a/be/src/exec/read-write-util.h +++ b/be/src/exec/read-write-util.h @@ -60,11 +60,11 @@ class ReadWriteUtil { static int GetVInt(uint8_t* buf, int32_t* vint); /// Writes a variable-length Long or int value to a byte buffer. - /// Returns the number of bytes written + /// Returns the number of bytes written. static int64_t PutVLong(int64_t val, uint8_t* buf); static int64_t PutVInt(int32_t val, uint8_t* buf); - /// returns size of the encoded long value, not including the 1 byte for length + /// Returns size of the encoded long value, including the 1 byte for length. static int VLongRequiredBytes(int64_t val); /// Read a variable-length Long value from a byte buffer starting at the specified @@ -211,41 +211,63 @@ inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong) return len; } +// Returns size of the encoded long value, including the 1 byte for length for val < -112 +// or val > 127. inline int ReadWriteUtil::VLongRequiredBytes(int64_t val) { - // returns size of the encoded long value, not including the 1 byte for length - if (val & 0xFF00000000000000llu) return 8; - if (val & 0x00FF000000000000llu) return 7; - if (val & 0x0000FF0000000000llu) return 6; - if (val & 0x000000FF00000000llu) return 5; - if (val & 0x00000000FF000000llu) return 4; - if (val & 0x0000000000FF0000llu) return 3; - if (val & 0x000000000000FF00llu) return 2; - // Values between -112 and 127 are stored using 1 byte, - // values between -127 and -112 are stored using 2 bytes - // See ReadWriteUtil::DecodeVIntSize for this case - if (val < -112) return 2; - return 1; + if (val >= -112 && val <= 127) return 1; + // If 'val' is negtive, take the one's complement. + if (val < 0) val = ~val; + return 9 - __builtin_clzll(val)/8; } +// Serializes 'val' to a binary stream with zero-compressed encoding. For -112<=val<=127, +// only one byte is used with the actual value. For other values of 'val', the first byte +// value indicates whether the long is positive or negative, and the number of bytes that +// follow. If the first byte value v is between -113 and -120, the following long is +// positive, with number of bytes that follow are -(v+112). If the first byte value v is +// between -121 and -128, the following long is negative, with number of bytes that follow +// are -(v+120). Bytes are stored in the high-non-zero-byte-first order. Returns the +// number of bytes written. +// For more information, see the documentation for 'WritableUtils.writeVLong()' method: +// https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/WritableUtils.html inline int64_t ReadWriteUtil::PutVLong(int64_t val, uint8_t* buf) { int64_t num_bytes = VLongRequiredBytes(val); if (num_bytes == 1) { + DCHECK(val >= -112 && val <= 127); // store the value itself instead of the length buf[0] = static_cast<int8_t>(val); return 1; } // This is how we encode the length for a length less than or equal to 8 - buf[0] = -119 + num_bytes; + DCHECK_GE(num_bytes, 2); + DCHECK_LE(num_bytes, 9); + if (val < 0) { + DCHECK_LT(val, -112); + // The first byte in 'buf' should contain a value between -121 and -128 that makes the + // following condition true: -(buf[0] + 120) == num_bytes - 1. + // Note that 'num_bytes' includes the 1 extra byte for length. + buf[0] = -(num_bytes + 119); + // If 'val' is negtive, take the one's complement. + // See the source code for WritableUtils.writeVLong() method: + // https://hadoop.apache.org/docs/r2.7.2/api/src-html/org/apache/hadoop/io/ + // WritableUtils.html#line.271 + val = ~val; + } else { + DCHECK_GT(val, 127); + // The first byte in 'buf' should contain a value between -113 and -120 that makes the + // following condition true: -(buf[0] + 112) == num_bytes - 1. + // Note that 'num_bytes' includes the 1 extra byte for length. + buf[0] = -(num_bytes + 111); + } - // write to buffer in reversed endianness - for (int i = 0; i < num_bytes; ++i) { - buf[i+1] = (val >> (8 * (num_bytes - i - 1))) & 0xFF; + // write to the buffer in Big Endianness + for (int i = 1; i < num_bytes; ++i) { + buf[i] = (val >> (8 * (num_bytes - i - 1))) & 0xFF; } - // +1 for the length byte - return num_bytes + 1; + return num_bytes; } inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/compress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 429ae66..efa39bf 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -204,13 +204,27 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t *output_length, uint8_t** output) { DCHECK_GE(input_length, 0); - // Hadoop uses a block compression scheme on top of snappy. First there is - // an integer which is the size of the decompressed data followed by a - // sequence of compressed blocks each preceded with an integer size. - // For testing purposes we are going to generate two blocks. - int64_t block_size = input_length / 2; - size_t length = snappy::MaxCompressedLength(block_size) * 2; - length += 3 * sizeof (int32_t); + // Hadoop uses a block compression scheme on top of snappy. The layout is as follows: + // - size of the entire decompressed data (4 bytes) + // - size of the 1st compressed block (4 bytes) + // - 1st compressed block + // - size of the 2nd compressed block (4 bytes) + // - 2nd compressed block + // ... + // For testing purposes we are going to generate two blocks if input_length >= 4K. + vector<int64_t> block_sizes; + size_t length; + if (input_length == 0) { + length = sizeof (int32_t); + } else if (input_length < 4 * 1024) { + block_sizes.push_back(input_length); + length = snappy::MaxCompressedLength(block_sizes[0]) + 2 * sizeof (int32_t); + } else { + block_sizes.push_back(input_length / 2); + block_sizes.push_back(input_length - block_sizes[0]); + length = snappy::MaxCompressedLength(block_sizes[0]) + + snappy::MaxCompressedLength(block_sizes[1]) + 3 * sizeof (int32_t); + } DCHECK(!output_preallocated || length <= *output_length); if (output_preallocated) { @@ -222,13 +236,12 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated, } uint8_t* outp = out_buffer_; - uint8_t* sizep; ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length)); outp += sizeof (int32_t); - while (input_length > 0) { + for (int64_t block_size: block_sizes) { // TODO: should this be a while or a do-while loop? Check what Hadoop does. // Point at the spot to store the compressed size. - sizep = outp; + uint8_t* sizep = outp; outp += sizeof (int32_t); size_t size; snappy::RawCompress(reinterpret_cast<const char*>(input), @@ -236,8 +249,8 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated, ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size)); input += block_size; - input_length -= block_size; outp += size; + DCHECK_LE(outp - out_buffer_, length); } *output = out_buffer_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/decompress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index 7709bb1..1f84bad 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -73,8 +73,15 @@ class DecompressorTest : public ::testing::Test { DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(), sizeof(input_), input_); } else { - CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), + CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_); + // Test with odd-length input (to test the calculation of block-sizes in + // SnappyBlockCompressor) + CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_) - 1, input_); + // Test with input length of 1024 (to test SnappyBlockCompressor with a single + // block) + CompressAndDecompress(compressor.get(), decompressor.get(), 1024, input_); + // Test with empty input if (format != THdfsCompression::BZIP2) { CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test index fe4d829..528b83e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test +++ b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test @@ -1,8 +1,5 @@ ==== ---- QUERY -drop table if exists __seq_write; -==== ----- QUERY SET COMPRESSION_CODEC=NONE; SET ALLOW_UNSUPPORTED_FORMATS=1; SET SEQ_COMPRESSION_MODE=BLOCK; @@ -92,5 +89,192 @@ select * from __seq_write; INT,STRING,DOUBLE ==== ---- QUERY -drop table __seq_write; +# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read +# it back +SET COMPRESSION_CODEC=NONE; +SET SEQ_COMPRESSION_MODE=RECORD; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_none_rec like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_none_rec partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_none_rec; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then +# read it back +SET COMPRESSION_CODEC=DEFAULT; +SET SEQ_COMPRESSION_MODE=RECORD; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_def_rec like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_def_rec partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_def_rec; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and +# then read it back +SET COMPRESSION_CODEC=SNAPPY_BLOCKED; +SET SEQ_COMPRESSION_MODE=RECORD; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_snapb_rec; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read +# it back +SET COMPRESSION_CODEC=SNAPPY; +SET SEQ_COMPRESSION_MODE=RECORD; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_snap_rec like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_snap_rec partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_snap_rec; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read +# it back +SET COMPRESSION_CODEC=GZIP; +SET SEQ_COMPRESSION_MODE=RECORD; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_gzip_rec; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it +# back +SET COMPRESSION_CODEC=NONE; +SET SEQ_COMPRESSION_MODE=BLOCK; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_none_block like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_none_block partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_none_block; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read +# it back +SET COMPRESSION_CODEC=DEFAULT; +SET SEQ_COMPRESSION_MODE=BLOCK; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_def_block like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_def_block partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_def_block; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and +# then read it back +SET COMPRESSION_CODEC=SNAPPY_BLOCKED; +SET SEQ_COMPRESSION_MODE=BLOCK; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_snapb_block like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_snapb_block partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_snapb_block; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read +# it back +SET COMPRESSION_CODEC=SNAPPY; +SET SEQ_COMPRESSION_MODE=BLOCK; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_snap_block like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_snap_block partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_snap_block; +---- RESULTS +37999 +---- TYPES +BIGINT +==== +---- QUERY +# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it +# back +SET COMPRESSION_CODEC=GZIP; +SET SEQ_COMPRESSION_MODE=BLOCK; +SET ALLOW_UNSUPPORTED_FORMATS=1; +create table store_sales_seq_gzip_block like tpcds_parquet.store_sales +stored as SEQUENCEFILE; +insert into store_sales_seq_gzip_block partition(ss_sold_date_sk) +select * from tpcds_parquet.store_sales +where ss_sold_date_sk between 2450816 and 2451200; +==== +---- QUERY +select count(*) from store_sales_seq_gzip_block; +---- RESULTS +37999 +---- TYPES +BIGINT ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/tests/query_test/test_compressed_formats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index 654ea48..36dc427 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -144,18 +144,43 @@ class TestTableWriters(ImpalaTestSuite): (v.get_value('table_format').file_format =='text' and v.get_value('table_format').compression_codec == 'none')) - def test_seq_writer(self, vector): - # TODO debug this test, same as seq writer. - # This caused by a zlib failure. Suspected cause is too small a buffer - # passed to zlib for compression; similar to IMPALA-424 - pytest.skip() - self.run_test_case('QueryTest/seq-writer', vector) + def test_seq_writer(self, vector, unique_database): + self.run_test_case('QueryTest/seq-writer', vector, unique_database) + + def test_seq_writer_hive_compatibility(self, vector, unique_database): + self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1') + # Write sequence files with different compression codec/compression mode and then read + # it back in Impala and Hive. + # Note that we don't test snappy here as the snappy codec used by Impala does not seem + # to be fully compatible with the snappy codec used by Hive. + for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'), + ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'), + ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]: + table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode) + self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec) + self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode) + self.client.execute('create table %s like functional.zipcode_incomes stored as ' + 'sequencefile' % table_name) + # Write sequence file of size greater than 4K + self.client.execute('insert into %s select * from functional.zipcode_incomes where ' + 'zip >= "5"' % table_name) + # Write sequence file of size less than 4K + self.client.execute('insert into %s select * from functional.zipcode_incomes where ' + 'zip="00601"' % table_name) + # Read it back in Impala + output = self.client.execute('select count(*) from %s' % table_name) + assert '16541' == output.get_data() + # Read it back in Hive + output = self.run_stmt_in_hive('select count(*) from %s' % table_name) + assert '16541' == output.split('\n')[1] def test_avro_writer(self, vector): self.run_test_case('QueryTest/avro-writer', vector) def test_text_writer(self, vector): - # TODO debug this test, same as seq writer. + # TODO debug this test. + # This caused by a zlib failure. Suspected cause is too small a buffer + # passed to zlib for compression; similar to IMPALA-424 pytest.skip() self.run_test_case('QueryTest/text-writer', vector)
