IMPALA-7190: Remove unsupported format writer support This patch removes write support for unsupported formats like Sequence, Avro and compressed text. Also, the related query options ALLOW_UNSUPPORTED_FORMATS and SEQ_COMPRESSION_MODE have been migrated to the REMOVED query options type.
Testing: Ran exhaustive build. Change-Id: I821dc7495a901f1658daa500daf3791b386c7185 Reviewed-on: http://gerrit.cloudera.org:8080/10823 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30e82c63 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30e82c63 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30e82c63 Branch: refs/heads/master Commit: 30e82c63ecdd56ded10fed931d95ab6d994b9244 Parents: 6f52ce1 Author: Bikramjeet Vig <[email protected]> Authored: Mon Jun 25 18:11:08 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jul 3 20:34:27 2018 +0000 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 2 - be/src/exec/hdfs-avro-table-writer.cc | 295 --------------- be/src/exec/hdfs-avro-table-writer.h | 121 ------- be/src/exec/hdfs-sequence-table-writer.cc | 361 ------------------- be/src/exec/hdfs-sequence-table-writer.h | 194 ---------- be/src/exec/hdfs-table-sink.cc | 48 +-- be/src/exec/hdfs-text-table-writer.cc | 61 +--- be/src/exec/hdfs-text-table-writer.h | 9 - be/src/service/query-options-test.cc | 2 - be/src/service/query-options.cc | 16 - be/src/service/query-options.h | 5 +- common/thrift/ImpalaInternalService.thrift | 6 - common/thrift/ImpalaService.thrift | 6 +- .../apache/impala/planner/PlannerTestBase.java | 1 - testdata/bad_avro_snap/README | 4 +- .../queries/QueryTest/avro-writer.test | 43 --- .../queries/QueryTest/seq-writer.test | 308 ---------------- .../functional-query/queries/QueryTest/set.test | 3 - .../queries/QueryTest/text-writer.test | 47 --- .../queries/QueryTest/unsupported-writers.test | 77 ++++ tests/common/test_dimensions.py | 13 - tests/hs2/test_hs2.py | 8 +- tests/metadata/test_partition_metadata.py | 26 +- tests/query_test/test_compressed_formats.py | 62 +--- tests/shell/test_shell_interactive.py | 10 +- 25 files changed, 121 insertions(+), 1607 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 1753cb0..4544b95 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -56,12 +56,10 @@ add_library(Exec hdfs-rcfile-scanner.cc hdfs-sequence-scanner.cc hdfs-avro-scanner.cc - hdfs-avro-table-writer.cc hdfs-avro-scanner-ir.cc hdfs-plugin-text-scanner.cc hdfs-text-scanner.cc hdfs-text-table-writer.cc - hdfs-sequence-table-writer.cc hdfs-parquet-scanner.cc hdfs-parquet-scanner-ir.cc hdfs-parquet-table-writer.cc http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc deleted file mode 100644 index 3ce296d..0000000 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ /dev/null @@ -1,295 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/hdfs-avro-table-writer.h" - -#include <vector> -#include <hdfs.h> -#include <boost/scoped_ptr.hpp> -#include <stdlib.h> -#include <gutil/strings/substitute.h> - -#include "exec/exec-node.h" -#include "exec/hdfs-table-sink.h" -#include "util/compress.h" -#include "util/hdfs-util.h" -#include "util/uid-util.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "runtime/mem-pool.h" -#include "runtime/mem-tracker.h" -#include "runtime/raw-value.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "runtime/hdfs-fs-cache.h" -#include "runtime/types.h" -#include "util/runtime-profile-counters.h" -#include "write-stream.inline.h" - -#include "common/names.h" - -using namespace strings; -using namespace impala; - -const uint8_t OBJ1[4] = {'O', 'b', 'j', 1}; -const char* AVRO_SCHEMA_STR = "avro.schema"; -const char* AVRO_CODEC_STR = "avro.codec"; -const THdfsCompression::type AVRO_DEFAULT_CODEC = THdfsCompression::SNAPPY; -// Desired size of each Avro block (bytes); actual block size will vary +/- the -// size of a row. This is approximate size of the block before compression. -const int DEFAULT_AVRO_BLOCK_SIZE = 64 * 1024; - -HdfsAvroTableWriter::HdfsAvroTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc) - : HdfsTableWriter(parent, state, output, partition, table_desc), - unflushed_rows_(0) { - mem_pool_.reset(new MemPool(parent->mem_tracker())); -} - -void HdfsAvroTableWriter::ConsumeRow(TupleRow* row) { - ++unflushed_rows_; - int num_non_partition_cols = - table_desc_->num_cols() - table_desc_->num_clustering_cols(); - for (int j = 0; j < num_non_partition_cols; ++j) { - void* value = output_expr_evals_[j]->GetValue(row); - AppendField(output_expr_evals_[j]->root().type(), value); - } -} - -inline void HdfsAvroTableWriter::AppendField(const ColumnType& type, const void* value) { - // Each avro field is written as union, which is a ZLong indicating the union - // field followed by the encoded value. Impala/Hive always stores values as - // a union of [ColumnType, NULL]. - // TODO: the writer may be asked to write [NULL, ColumnType] unions. It is wrong - // for us to assume [ColumnType, NULL]. - - if (value == NULL) { - // indicate the second field of the union - out_.WriteZLong(1); - // No bytes are written for a null value. - return; - } - - // indicate that we are using the first field of the union - out_.WriteZLong(0); - - switch (type.type) { - case TYPE_BOOLEAN: - out_.WriteByte(*reinterpret_cast<const char*>(value)); - break; - case TYPE_TINYINT: - out_.WriteZInt(*reinterpret_cast<const int8_t*>(value)); - break; - case TYPE_SMALLINT: - out_.WriteZInt(*reinterpret_cast<const int16_t*>(value)); - break; - case TYPE_INT: - out_.WriteZInt(*reinterpret_cast<const int32_t*>(value)); - break; - case TYPE_BIGINT: - out_.WriteZLong(*reinterpret_cast<const int64_t*>(value)); - break; - case TYPE_FLOAT: - out_.WriteBytes(4, reinterpret_cast<const char*>(value)); - break; - case TYPE_DOUBLE: - out_.WriteBytes(8, reinterpret_cast<const char*>(value)); - break; - case TYPE_STRING: { - const StringValue& sv = *reinterpret_cast<const StringValue*>(value); - out_.WriteZLong(sv.len); - out_.WriteBytes(sv.len, sv.ptr); - break; - } - case TYPE_DECIMAL: { - int byte_size = ColumnType::GetDecimalByteSize(type.precision); - out_.WriteZLong(byte_size); -#if __BYTE_ORDER == __LITTLE_ENDIAN - char tmp[16]; - BitUtil::ByteSwap(tmp, value, byte_size); - out_.WriteBytes(byte_size, tmp); -#else - out_.WriteBytes(byte_size, reinterpret_cast<const char*>(value)); -#endif - break; - } - case TYPE_TIMESTAMP: - case TYPE_BINARY: - case INVALID_TYPE: - case TYPE_NULL: - case TYPE_DATE: - case TYPE_DATETIME: - default: - DCHECK(false); - } -} - -Status HdfsAvroTableWriter::Init() { - // create the Sync marker - sync_marker_ = GenerateUUIDString(); - - THdfsCompression::type codec = AVRO_DEFAULT_CODEC; - if (state_->query_options().__isset.compression_codec) { - codec = state_->query_options().compression_codec; - } - - // sets codec_name_ and compressor_ - codec_type_ = codec; - switch (codec) { - case THdfsCompression::SNAPPY: - codec_name_ = "snappy"; - break; - case THdfsCompression::DEFLATE: - codec_name_ = "deflate"; - break; - case THdfsCompression::NONE: - codec_name_ = "null"; - return Status::OK(); - default: - const char* name = _THdfsCompression_VALUES_TO_NAMES.find(codec)->second; - return Status(Substitute( - "Avro only supports NONE, DEFLATE, and SNAPPY codecs; unsupported codec $0", - name)); - } - RETURN_IF_ERROR(Codec::CreateCompressor(mem_pool_.get(), true, codec, &compressor_)); - DCHECK(compressor_.get() != NULL); - - return Status::OK(); -} - -void HdfsAvroTableWriter::Close() { - mem_pool_->FreeAll(); -} - -Status HdfsAvroTableWriter::AppendRows( - RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) { - int32_t limit; - bool all_rows = row_group_indices.empty(); - if (all_rows) { - limit = batch->num_rows(); - } else { - limit = row_group_indices.size(); - } - COUNTER_ADD(parent_->rows_inserted_counter(), limit); - - { - SCOPED_TIMER(parent_->encode_timer()); - for (int row_idx = 0; row_idx < limit; ++row_idx) { - TupleRow* row = all_rows ? - batch->GetRow(row_idx) : batch->GetRow(row_group_indices[row_idx]); - ConsumeRow(row); - } - } - - if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush()); - *new_file = false; - return Status::OK(); -} - -Status HdfsAvroTableWriter::WriteFileHeader() { - out_.Clear(); - out_.WriteBytes(4, reinterpret_cast<const uint8_t*>(OBJ1)); - - // Write 'File Metadata' as an encoded avro map - // number of key/value pairs in the map - out_.WriteZLong(2); - - // Schema information - out_.WriteZLong(strlen(AVRO_SCHEMA_STR)); - out_.WriteBytes(strlen(AVRO_SCHEMA_STR), AVRO_SCHEMA_STR); - const string& avro_schema = table_desc_->avro_schema(); - out_.WriteZLong(avro_schema.size()); - out_.WriteBytes(avro_schema.size(), avro_schema.data()); - - // codec information - out_.WriteZLong(strlen(AVRO_CODEC_STR)); - out_.WriteBytes(strlen(AVRO_CODEC_STR), AVRO_CODEC_STR); - out_.WriteZLong(codec_name_.size()); - out_.WriteBytes(codec_name_.size(), codec_name_.data()); - - // Write end of map marker - out_.WriteZLong(0); - - out_.WriteBytes(sync_marker_.size(), sync_marker_.data()); - - const string& text = out_.String(); - RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), - text.size())); - out_.Clear(); - return Status::OK(); -} - -Status HdfsAvroTableWriter::Flush() { - if (unflushed_rows_ == 0) return Status::OK(); - - WriteStream header; - // 1. Count of objects in this block - header.WriteZLong(unflushed_rows_); - - const uint8_t* output; - int64_t output_length; - // Snappy format requires a CRC after the compressed data - uint32_t crc; - const string& text = out_.String(); - - if (codec_type_ != THdfsCompression::NONE) { - SCOPED_TIMER(parent_->compress_timer()); - uint8_t* temp; - RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(), - reinterpret_cast<const uint8_t*>(text.data()), &output_length, &temp)); - output = temp; - if (codec_type_ == THdfsCompression::SNAPPY) { - crc = SnappyCompressor::ComputeChecksum( - text.size(), reinterpret_cast<const uint8_t*>(text.data())); - } - } else { - output = reinterpret_cast<const uint8_t*>(text.data()); - output_length = out_.Size(); - } - - // 2. length of serialized objects - if (codec_type_ == THdfsCompression::SNAPPY) { - // + 4 for the CRC checksum at the end of the compressed block - header.WriteZLong(output_length + 4); - } else { - header.WriteZLong(output_length); - } - - const string& head = header.String(); - { - SCOPED_TIMER(parent_->hdfs_write_timer()); - // Flush (1) and (2) to HDFS - RETURN_IF_ERROR( - Write(reinterpret_cast<const uint8_t*>(head.data()), head.size())); - // 3. serialized objects - RETURN_IF_ERROR(Write(output, output_length)); - - // Write CRC checksum - if (codec_type_ == THdfsCompression::SNAPPY) { - RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(&crc), sizeof(uint32_t))); - } - } - - // 4. sync marker - RETURN_IF_ERROR( - Write(reinterpret_cast<const uint8_t*>(sync_marker_.data()), sync_marker_.size())); - - out_.Clear(); - unflushed_rows_ = 0; - return Status::OK(); -} http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h deleted file mode 100644 index 6966860..0000000 --- a/be/src/exec/hdfs-avro-table-writer.h +++ /dev/null @@ -1,121 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_EXEC_HDFS_AVRO_WRITER_H -#define IMPALA_EXEC_HDFS_AVRO_WRITER_H - -#include <hdfs.h> -#include <sstream> -#include <string> - -#include "common/status.h" -#include "exec/hdfs-table-writer.h" -#include "runtime/mem-pool.h" -#include "util/codec.h" -#include "exec/write-stream.h" - -namespace impala { - -struct ColumnType; -class HdfsTableSink; -class RuntimeState; -class ScalarExprEvaluator; -class TupleDescriptor; -class TupleRow; -struct OutputPartition; -struct StringValue; - -/// Consumes rows and outputs the rows into an Avro file in HDFS -/// Each Avro file contains a block of records (rows). The file metadata specifies the -/// schema of the records in addition to the name of the codec, if any, used to compress -/// blocks. The structure is: -/// [ Metadata ] -/// [ Sync Marker ] -/// [ Data Block ] -/// ... -/// [ Data Block ] -// -/// Each Data Block consists of: -/// [ Number of Rows in Block ] -/// [ Size of serialized objects, after compression ] -/// [ Serialized objects, compressed ] -/// [ Sync Marker ] -// -/// If compression is used, each block is compressed individually. The block size defaults -/// to about 64KB before compression. -/// This writer implements the Avro 1.7.7 spec: -/// http://avro.apache.org/docs/1.7.7/spec.html -class HdfsAvroTableWriter : public HdfsTableWriter { - public: - HdfsAvroTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc); - - virtual ~HdfsAvroTableWriter() { } - - virtual Status Init() override; - virtual Status Finalize() override { return Flush(); } - virtual Status InitNewFile() override { return WriteFileHeader(); } - virtual void Close() override; - virtual uint64_t default_block_size() const override { return 0; } - virtual std::string file_extension() const override { return "avro"; } - - /// Outputs the given rows into an HDFS sequence file. The rows are buffered - /// to fill a sequence file block. - virtual Status AppendRows(RowBatch* rows, - const std::vector<int32_t>& row_group_indices, bool* new_file) override; - - private: - /// Processes a single row, appending to out_ - void ConsumeRow(TupleRow* row); - - /// Adds an encoded field to out_ - inline void AppendField(const ColumnType& type, const void* value); - - /// Writes the Avro file header to HDFS - Status WriteFileHeader() WARN_UNUSED_RESULT; - - /// Writes the contents of out_ to HDFS as a single Avro file block. - /// Returns an error if write to HDFS fails. - Status Flush() WARN_UNUSED_RESULT; - - /// Buffer which holds accumulated output - WriteStream out_; - - /// Memory pool used by codec to allocate output buffer. - /// Owned by this class. Initialized using parent's memtracker. - boost::scoped_ptr<MemPool> mem_pool_; - - /// Number of rows consumed since last flush - uint64_t unflushed_rows_; - - /// Name of codec, only set if codec_type_ != NONE - std::string codec_name_; - - /// Type of the codec, will be NONE if no compression is used - THdfsCompression::type codec_type_; - - /// The codec for compressing, only set if codec_type_ != NONE - boost::scoped_ptr<Codec> compressor_; - - /// 16 byte sync marker (a uuid) - std::string sync_marker_; -}; - -} // namespace impala -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc deleted file mode 100644 index 42a70f0..0000000 --- a/be/src/exec/hdfs-sequence-table-writer.cc +++ /dev/null @@ -1,361 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/hdfs-sequence-table-writer.h" -#include "exec/write-stream.inline.h" -#include "exec/exec-node.h" -#include "util/hdfs-util.h" -#include "util/uid-util.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "runtime/mem-tracker.h" -#include "runtime/raw-value.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "runtime/hdfs-fs-cache.h" -#include "util/runtime-profile-counters.h" - -#include <vector> -#include <hdfs.h> -#include <boost/scoped_ptr.hpp> -#include <stdlib.h> - -#include "common/names.h" - -namespace impala { - -const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6}; -const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text"; -const char* HdfsSequenceTableWriter::KEY_CLASS_NAME = - "org.apache.hadoop.io.BytesWritable"; - -HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc) - : HdfsTableWriter(parent, state, output, partition, table_desc), - mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false), - unflushed_rows_(0), record_compression_(false) { - approx_block_size_ = 64 * 1024 * 1024; - parent->mem_tracker()->Consume(approx_block_size_); - field_delim_ = partition->field_delim(); - escape_char_ = partition->escape_char(); -} - -Status HdfsSequenceTableWriter::Init() { - THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED; - const TQueryOptions& query_options = state_->query_options(); - if (query_options.__isset.compression_codec) { - codec = query_options.compression_codec; - if (codec == THdfsCompression::SNAPPY) { - // Seq file (and in general things that use hadoop.io.codec) always - // mean snappy_blocked. - codec = THdfsCompression::SNAPPY_BLOCKED; - } - } - if (codec != THdfsCompression::NONE) { - compress_flag_ = true; - if (query_options.__isset.seq_compression_mode) { - record_compression_ = - query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD; - } - RETURN_IF_ERROR(Codec::GetHadoopCodecClassName(codec, &codec_name_)); - RETURN_IF_ERROR(Codec::CreateCompressor( - mem_pool_.get(), true, codec_name_, &compressor_)); - DCHECK(compressor_.get() != NULL); - } - - // create the Sync marker - string uuid = GenerateUUIDString(); - uint8_t sync_neg1[20]; - - ReadWriteUtil::PutInt(sync_neg1, static_cast<uint32_t>(-1)); - DCHECK(uuid.size() == 16); - memcpy(sync_neg1 + sizeof(int32_t), uuid.data(), uuid.size()); - neg1_sync_marker_ = string(reinterpret_cast<char*>(sync_neg1), 20); - sync_marker_ = uuid; - - return Status::OK(); -} - -Status HdfsSequenceTableWriter::AppendRows( - RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) { - int32_t limit; - if (row_group_indices.empty()) { - limit = batch->num_rows(); - } else { - limit = row_group_indices.size(); - } - COUNTER_ADD(parent_->rows_inserted_counter(), limit); - - bool all_rows = row_group_indices.empty(); - int num_non_partition_cols = - table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) - << parent_->DebugString(); - - { - SCOPED_TIMER(parent_->encode_timer()); - if (all_rows) { - for (int row_idx = 0; row_idx < limit; ++row_idx) { - RETURN_IF_ERROR(ConsumeRow(batch->GetRow(row_idx))); - } - } else { - for (int row_idx = 0; row_idx < limit; ++row_idx) { - TupleRow* row = batch->GetRow(row_group_indices[row_idx]); - RETURN_IF_ERROR(ConsumeRow(row)); - } - } - } - - if (!compress_flag_) { - out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data()); - } - - if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush()); - *new_file = false; - return Status::OK(); -} - -Status HdfsSequenceTableWriter::WriteFileHeader() { - out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE); - - // Setup to be correct key class - out_.WriteText(strlen(KEY_CLASS_NAME), - reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME)); - - // Setup to be correct value class - out_.WriteText(strlen(VALUE_CLASS_NAME), - reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME)); - - // Flag for if compression is used - out_.WriteBoolean(compress_flag_); - // Only valid if compression is used. Indicates if block compression is used. - out_.WriteBoolean(compress_flag_ && !record_compression_); - - // Output the name of our compression codec, parsed by readers - if (compress_flag_) { - out_.WriteText(codec_name_.size(), - reinterpret_cast<const uint8_t*>(codec_name_.data())); - } - - // Meta data is formated as an integer N followed by N*2 strings, - // which are key-value pairs. Hive does not write meta data, so neither does Impala - out_.WriteInt(0); - - // write the sync marker - out_.WriteBytes(sync_marker_.size(), sync_marker_.data()); - - string text = out_.String(); - RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), text.size())); - out_.Clear(); - return Status::OK(); -} - -Status HdfsSequenceTableWriter::WriteCompressedBlock() { - WriteStream record; - uint8_t *output; - int64_t output_length; - DCHECK(compress_flag_); - - // Add a sync marker to start of the block - record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data()); - - // Output the number of rows in this block - record.WriteVLong(unflushed_rows_); - - // Output compressed key-lengths block-size & compressed key-lengths block. - // The key-lengths block contains byte value of 4 as a key length for each row (this is - // what Hive does). - string key_lengths_text(unflushed_rows_, '\x04'); - { - SCOPED_TIMER(parent_->compress_timer()); - RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(), - reinterpret_cast<const uint8_t*>(key_lengths_text.data()), &output_length, - &output)); - } - record.WriteVInt(output_length); - record.WriteBytes(output_length, output); - - // Output compressed keys block-size & compressed keys block. - // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what - // Hive does). - string keys_text(unflushed_rows_ * 4, '\0'); - { - SCOPED_TIMER(parent_->compress_timer()); - RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(), - reinterpret_cast<const uint8_t*>(keys_text.data()), &output_length, &output)); - } - record.WriteVInt(output_length); - record.WriteBytes(output_length, output); - - // Output compressed value-lengths block-size & compressed value-lengths block - string value_lengths_text = out_value_lengths_block_.String(); - { - SCOPED_TIMER(parent_->compress_timer()); - RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(), - reinterpret_cast<const uint8_t*>(value_lengths_text.data()), &output_length, &output)); - } - record.WriteVInt(output_length); - record.WriteBytes(output_length, output); - - // Output compressed values block-size & compressed values block - string text = out_.String(); - { - SCOPED_TIMER(parent_->compress_timer()); - RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(), - reinterpret_cast<const uint8_t*>(text.data()), &output_length, &output)); - } - record.WriteVInt(output_length); - record.WriteBytes(output_length, output); - - string rec = record.String(); - RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size())); - return Status::OK(); -} - -inline void HdfsSequenceTableWriter::WriteEscapedString(const StringValue* str_val, - WriteStream* buf) { - for (int i = 0; i < str_val->len; ++i) { - if (str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_) { - buf->WriteByte(escape_char_); - } - buf->WriteByte(str_val->ptr[i]); - } -} - -void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, WriteStream* buf) { - // TODO Unify with text table writer - int num_non_partition_cols = - table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) - << parent_->DebugString(); - for (int j = 0; j < num_non_partition_cols; ++j) { - void* value = output_expr_evals_[j]->GetValue(row); - if (value != NULL) { - if (output_expr_evals_[j]->root().type().type == TYPE_STRING) { - WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_); - } else { - string str; - output_expr_evals_[j]->PrintValue(value, &str); - buf->WriteBytes(str.size(), str.data()); - } - } else { - // NULLs in hive are encoded based on the 'serialization.null.format' property. - const string& null_val = table_desc_->null_column_value(); - buf->WriteBytes(null_val.size(), null_val.data()); - } - // Append field delimiter. - if (j + 1 < num_non_partition_cols) { - buf->WriteByte(field_delim_); - } - } -} - -inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) { - ++unflushed_rows_; - row_buf_.Clear(); - if (compress_flag_ && !record_compression_) { - // Output row for a block compressed sequence file. - // Value block: Write the length as a vlong and then write the contents. - EncodeRow(row, &row_buf_); - out_.WriteVLong(row_buf_.Size()); - out_.WriteBytes(row_buf_.Size(), row_buf_.String().data()); - // Value-lengths block: Write the number of bytes we have just written to out_ as - // vlong - out_value_lengths_block_.WriteVLong( - ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size()); - return Status::OK(); - } - - EncodeRow(row, &row_buf_); - - const uint8_t* value_bytes; - int64_t value_length; - string text = row_buf_.String(); - if (compress_flag_) { - // apply compression to row_buf_ - // the length of the buffer must be prefixed to the buffer prior to compression - // - // TODO this incurs copy overhead to place the length in front of the - // buffer prior to compression. We may want to rewrite to avoid copying. - row_buf_.Clear(); - // encoding as "Text" writes the length before the text - row_buf_.WriteText(text.size(), reinterpret_cast<const uint8_t*>(&text.data()[0])); - text = row_buf_.String(); - uint8_t *tmp; - { - SCOPED_TIMER(parent_->compress_timer()); - RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(), - reinterpret_cast<const uint8_t*>(text.data()), &value_length, &tmp)); - } - value_bytes = tmp; - } else { - value_length = text.size(); - DCHECK_EQ(value_length, row_buf_.Size()); - value_bytes = reinterpret_cast<const uint8_t*>(text.data()); - } - - int rec_len = value_length; - // if the record is compressed, the length is part of the compressed text - // if not, then we need to write the length (below) and account for it's size - if (!compress_flag_) { - rec_len += ReadWriteUtil::VLongRequiredBytes(value_length); - } - // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence - // as a key just like Hive). - rec_len += 4; - - // Length of the record (incl. key and value length) - out_.WriteInt(rec_len); - - // Write length of the key and the key - out_.WriteInt(4); - out_.WriteBytes(4, "\0\0\0\0"); - - // if the record is compressed, the length is part of the compressed text - if (!compress_flag_) out_.WriteVLong(value_length); - - // write out the value (possibly compressed) - out_.WriteBytes(value_length, value_bytes); - return Status::OK(); -} - -Status HdfsSequenceTableWriter::Flush() { - if (unflushed_rows_ == 0) return Status::OK(); - - SCOPED_TIMER(parent_->hdfs_write_timer()); - - if (compress_flag_ && !record_compression_) { - RETURN_IF_ERROR(WriteCompressedBlock()); - } else { - string out_str = out_.String(); - RETURN_IF_ERROR( - Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size())); - } - out_.Clear(); - out_value_lengths_block_.Clear(); - unflushed_rows_ = 0; - return Status::OK(); -} - -void HdfsSequenceTableWriter::Close() { - // TODO: double check there is no memory leak. - parent_->mem_tracker()->Release(approx_block_size_); - mem_pool_->FreeAll(); -} - -} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h deleted file mode 100644 index f315920..0000000 --- a/be/src/exec/hdfs-sequence-table-writer.h +++ /dev/null @@ -1,194 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H -#define IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H - -#include <hdfs.h> -#include <sstream> - -#include "runtime/descriptors.h" -#include "exec/hdfs-table-sink.h" -#include "exec/hdfs-table-writer.h" -#include "util/codec.h" -#include "write-stream.h" - -namespace impala { - -class Expr; -class TupleDescriptor; -class TupleRow; -class RuntimeState; -struct StringValue; -struct OutputPartition; - -/// Sequence files are flat files consisting of binary key/value pairs. Essentially there -/// are 3 different formats for sequence files depending on the 'compression_codec' and -/// 'seq_compression_mode' query options: -/// - Uncompressed sequence file format -/// - Record-compressed sequence file format -/// - Block-compressed sequence file format -/// All of them share a common header described below. -/// -/// Sequence File Header -/// -------------------- -/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number -/// (e.g. SEQ4 or SEQ6) -/// - keyClassName - key class -/// - valueClassName - value class -/// - compression - A boolean which specifies if compression is turned on for keys/values -/// in this file. -/// - blockCompression - A boolean which specifies if block-compression is turned on for -/// keys/values in this file. -/// - compression codec - compression codec class which is used for compression of keys -/// and/or values (if compression is enabled). -/// - metadata - SequenceFile.Metadata for this file. -/// - sync - A 16 byte sync marker to denote end of the header. -/// -/// Uncompressed Sequence File Format -/// --------------------------------- -/// - Header -/// - Record -/// - Record length -/// - Key length -/// - Key -/// - Value -/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so. -/// -/// Record-Compressed Sequence File Format -/// -------------------------------------- -/// - Header -/// - Record -/// - Record length -/// - Key length -/// - Key -/// - Compressed Value -/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so. -/// -/// Block-Compressed Sequence File Format -/// ------------------------------------- -/// - Header -/// - Record Block -/// - Uncompressed number of records in the block -/// - Compressed key-lengths block-size -/// - Compressed key-lengths block -/// - Compressed keys block-size -/// - Compressed keys block -/// - Compressed value-lengths block-size -/// - Compressed value-lengths block -/// - Compressed values block-size -/// - Compressed values block -/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block. -/// The compressed blocks of key lengths and value lengths consist of the actual lengths -/// of individual keys/values encoded in zero-compressed integer format. - -/// Consumes rows and outputs the rows into a sequence file in HDFS -/// Output is buffered to fill sequence file blocks. -class HdfsSequenceTableWriter : public HdfsTableWriter { - public: - HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state, - OutputPartition* output, const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc); - - ~HdfsSequenceTableWriter() { } - - virtual Status Init(); - virtual Status Finalize() { return Flush(); } - virtual Status InitNewFile() { return WriteFileHeader(); } - virtual void Close(); - virtual uint64_t default_block_size() const { return 0; } - virtual std::string file_extension() const { return "seq"; } - - /// Outputs the given rows into an HDFS sequence file. The rows are buffered - /// to fill a sequence file block. - virtual Status AppendRows( - RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file); - - private: - /// processes a single row, delegates to Compress or NoCompress ConsumeRow(). - inline Status ConsumeRow(TupleRow* row); - - /// writes the SEQ file header to HDFS - Status WriteFileHeader(); - - /// writes the contents of out_value_lengths_block_ and out_ as a single - /// block-compressed record. - Status WriteCompressedBlock(); - - /// writes the tuple row to the given buffer; separates fields by field_delim_, - /// escapes string. - inline void EncodeRow(TupleRow* row, WriteStream* buf); - - /// writes the str_val to the buffer, escaping special characters - inline void WriteEscapedString(const StringValue* str_val, WriteStream* buf); - - /// flushes the output -- clearing out_ and writing to HDFS - /// if compress_flag_, will write contents of out_ as a single compressed block - Status Flush(); - - /// desired size of each block (bytes); actual block size will vary +/- the - /// size of a row; this is before compression is applied. - uint64_t approx_block_size_; - - /// buffer which holds accumulated output - WriteStream out_; - - /// buffer which holds accumulated value-lengths output (used with block-compressed - /// sequence files) - WriteStream out_value_lengths_block_; - - /// Temporary Buffer for a single row - WriteStream row_buf_; - - /// memory pool used by codec to allocate output buffer - boost::scoped_ptr<MemPool> mem_pool_; - - /// true if compression is enabled - bool compress_flag_; - - /// number of rows consumed since last flush - uint64_t unflushed_rows_; - - /// name of codec, only set if compress_flag_ - std::string codec_name_; - /// the codec for compressing, only set if compress_flag_ - boost::scoped_ptr<Codec> compressor_; - - /// true if compression is applied on each record individually - bool record_compression_; - - /// Character delimiting fields - char field_delim_; - - /// Escape character for text encoding - char escape_char_; - - /// 16 byte sync marker (a uuid) - std::string sync_marker_; - /// A -1 infront of the sync marker, used in decompressed formats - std::string neg1_sync_marker_; - - /// Name of java class to use when reading the keys - static const char* KEY_CLASS_NAME; - /// Name of java class to use when reading the values - static const char* VALUE_CLASS_NAME; - /// Magic characters used to identify the file type - static const uint8_t SEQ6_CODE[4]; -}; - -} // namespace impala -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index b6de7cf..9c46638 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -18,8 +18,6 @@ #include "exec/hdfs-table-sink.h" #include "exec/hdfs-table-writer.h" #include "exec/hdfs-text-table-writer.h" -#include "exec/hdfs-sequence-table-writer.h" -#include "exec/hdfs-avro-table-writer.h" #include "exec/hdfs-parquet-table-writer.h" #include "exec/exec-node.h" #include "gen-cpp/ImpalaInternalService_constants.h" @@ -469,28 +467,20 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, output_partition->partition_descriptor = &partition_descriptor; - bool allow_unsupported_formats = - state->query_options().__isset.allow_unsupported_formats && - state->query_options().allow_unsupported_formats; - if (!allow_unsupported_formats) { - if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE || - partition_descriptor.file_format() == THdfsFileFormat::AVRO) { - stringstream error_msg; - map<int, const char*>::const_iterator i = - _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format()); - error_msg << "Writing to table format " << i->second - << " is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS" - " to override."; - return Status(error_msg.str()); - } - if (partition_descriptor.file_format() == THdfsFileFormat::TEXT && - state->query_options().__isset.compression_codec && - state->query_options().compression_codec != THdfsCompression::NONE) { - stringstream error_msg; - error_msg << "Writing to compressed text table is not supported. " - "Use query option ALLOW_UNSUPPORTED_FORMATS to override."; - return Status(error_msg.str()); - } + if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE || + partition_descriptor.file_format() == THdfsFileFormat::AVRO) { + stringstream error_msg; + map<int, const char*>::const_iterator i = + _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format()); + error_msg << "Writing to table format " << i->second << " is not supported."; + return Status(error_msg.str()); + } + if (partition_descriptor.file_format() == THdfsFileFormat::TEXT && + state->query_options().__isset.compression_codec && + state->query_options().compression_codec != THdfsCompression::NONE) { + stringstream error_msg; + error_msg << "Writing to compressed text table is not supported. "; + return Status(error_msg.str()); } // It is incorrect to initialize a writer if there are no rows to feed it. The writer @@ -508,16 +498,6 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, new HdfsParquetTableWriter( this, state, output_partition, &partition_descriptor, table_desc_)); break; - case THdfsFileFormat::SEQUENCE_FILE: - output_partition->writer.reset( - new HdfsSequenceTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_)); - break; - case THdfsFileFormat::AVRO: - output_partition->writer.reset( - new HdfsAvroTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_)); - break; default: stringstream error_msg; map<int, const char*>::const_iterator i = http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc index aaee773..f09b161 100644 --- a/be/src/exec/hdfs-text-table-writer.cc +++ b/be/src/exec/hdfs-text-table-writer.cc @@ -25,8 +25,6 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/string-value.inline.h" -#include "util/codec.h" -#include "util/compress.h" #include "util/hdfs-util.h" #include "util/runtime-profile-counters.h" @@ -35,13 +33,6 @@ #include "common/names.h" -// Hdfs block size for compressed text. -static const int64_t COMPRESSED_BLOCK_SIZE = 64 * 1024 * 1024; - -// Size to buffer before compression. We want this to be less than the block size -// (compressed text is not splittable). -static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 1024; - namespace impala { HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent, @@ -61,41 +52,17 @@ HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent, } Status HdfsTextTableWriter::Init() { - const TQueryOptions& query_options = state_->query_options(); - codec_ = THdfsCompression::NONE; - if (query_options.__isset.compression_codec) { - codec_ = query_options.compression_codec; - if (codec_ == THdfsCompression::SNAPPY) { - // hadoop.io.codec always means SNAPPY_BLOCKED. Alias the two. - codec_ = THdfsCompression::SNAPPY_BLOCKED; - } - } - - if (codec_ != THdfsCompression::NONE) { - mem_pool_.reset(new MemPool(parent_->mem_tracker())); - RETURN_IF_ERROR(Codec::CreateCompressor( - mem_pool_.get(), true, codec_, &compressor_)); - flush_size_ = COMPRESSED_BUFFERED_SIZE; - } else { - flush_size_ = HDFS_FLUSH_WRITE_SIZE; - } parent_->mem_tracker()->Consume(flush_size_); return Status::OK(); } void HdfsTextTableWriter::Close() { parent_->mem_tracker()->Release(flush_size_); - if (mem_pool_.get() != NULL) mem_pool_->FreeAll(); } -uint64_t HdfsTextTableWriter::default_block_size() const { - return compressor_.get() == NULL ? 0 : COMPRESSED_BLOCK_SIZE; -} +uint64_t HdfsTextTableWriter::default_block_size() const { return 0; } -string HdfsTextTableWriter::file_extension() const { - if (compressor_.get() == NULL) return ""; - return compressor_->file_extension(); -} +string HdfsTextTableWriter::file_extension() const { return ""; } Status HdfsTextTableWriter::AppendRows( RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) { @@ -152,12 +119,7 @@ Status HdfsTextTableWriter::AppendRows( } *new_file = false; - if (rowbatch_stringstream_.tellp() >= flush_size_) { - RETURN_IF_ERROR(Flush()); - - // If compressed, start a new file (compressed data is not splittable). - *new_file = compressor_.get() != NULL; - } + if (rowbatch_stringstream_.tellp() >= flush_size_) RETURN_IF_ERROR(Flush()); return Status::OK(); } @@ -178,22 +140,9 @@ Status HdfsTextTableWriter::InitNewFile() { Status HdfsTextTableWriter::Flush() { string rowbatch_string = rowbatch_stringstream_.str(); rowbatch_stringstream_.str(string()); - const uint8_t* uncompressed_data = + const uint8_t* data = reinterpret_cast<const uint8_t*>(rowbatch_string.data()); - int64_t uncompressed_len = rowbatch_string.size(); - const uint8_t* data = uncompressed_data; - int64_t len = uncompressed_len; - - if (compressor_.get() != NULL) { - SCOPED_TIMER(parent_->compress_timer()); - uint8_t* compressed_data; - int64_t compressed_len; - RETURN_IF_ERROR(compressor_->ProcessBlock(false, - uncompressed_len, uncompressed_data, - &compressed_len, &compressed_data)); - data = compressed_data; - len = compressed_len; - } + int64_t len = rowbatch_string.size(); { SCOPED_TIMER(parent_->hdfs_write_timer()); http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-table-writer.h b/be/src/exec/hdfs-text-table-writer.h index 589ed23..e2f6135 100644 --- a/be/src/exec/hdfs-text-table-writer.h +++ b/be/src/exec/hdfs-text-table-writer.h @@ -87,15 +87,6 @@ class HdfsTextTableWriter : public HdfsTableWriter { /// Stringstream to buffer output. The stream is cleared between HDFS /// Write calls to allow for the internal buffers to be reused. std::stringstream rowbatch_stringstream_; - - /// Compression codec. - THdfsCompression::type codec_; - - /// Compressor if compression is enabled. - boost::scoped_ptr<Codec> compressor_; - - /// Memory pool to use with compressor_. - boost::scoped_ptr<MemPool> mem_pool_; }; } http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options-test.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index e5bc48d..b9bda60 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -208,8 +208,6 @@ TEST(QueryOptions, SetEnumOptions) { TParquetFallbackSchemaResolution, (POSITION, NAME)), true); TestEnumCase(options, CASE(parquet_array_resolution, TParquetArrayResolution, (THREE_LEVEL, TWO_LEVEL, TWO_LEVEL_THEN_THREE_LEVEL)), true); - TestEnumCase(options, CASE(seq_compression_mode, THdfsSeqCompressionMode, - (BLOCK, RECORD)), false); TestEnumCase(options, CASE(compression_codec, THdfsCompression, (NONE, GZIP, BZIP2, DEFAULT, SNAPPY, SNAPPY_BLOCKED)), false); #undef CASE http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 2e3415f..1063fef 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -226,25 +226,9 @@ Status impala::SetQueryOption(const string& key, const string& value, case TImpalaQueryOptions::NUM_SCANNER_THREADS: query_options->__set_num_scanner_threads(atoi(value.c_str())); break; - case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS: - query_options->__set_allow_unsupported_formats( - iequals(value, "true") || iequals(value, "1")); - break; case TImpalaQueryOptions::DEBUG_ACTION: query_options->__set_debug_action(value.c_str()); break; - case TImpalaQueryOptions::SEQ_COMPRESSION_MODE: { - if (iequals(value, "block")) { - query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::BLOCK); - } else if (iequals(value, "record")) { - query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::RECORD); - } else { - stringstream ss; - ss << "Invalid sequence file compression mode: " << value; - return Status(ss.str()); - } - break; - } case TImpalaQueryOptions::COMPRESSION_CODEC: { if (iequals(value, "none")) { query_options->__set_compression_codec(THdfsCompression::NONE); http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index fce042c..01f6e74 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -44,8 +44,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> TImpalaQueryOptions::ALLOW_ERASURE_CODED_FILES + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ - QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\ - TQueryOptionLevel::DEPRECATED)\ + REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\ QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\ REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\ @@ -74,7 +73,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR)\ - QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE, TQueryOptionLevel::REGULAR)\ + REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\ QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,\ TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,\ http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 6780138..120aebc 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -101,7 +101,6 @@ struct TQueryOptions { 5: optional i32 num_nodes = NUM_NODES_ALL 6: optional i64 max_scan_range_length = 0 7: optional i32 num_scanner_threads = 0 - 9: optional bool allow_unsupported_formats = 0 11: optional string debug_action = "" 12: optional i64 mem_limit = 0 14: optional CatalogObjects.THdfsCompression compression_codec @@ -133,11 +132,6 @@ struct TQueryOptions { // has no plan hints, and at least one table is missing relevant stats. 29: optional bool disable_unsafe_spills = 0 - // Mode for compression; RECORD, or BLOCK - // This field only applies for certain file types and is ignored - // by all other file types. - 30: optional CatalogObjects.THdfsSeqCompressionMode seq_compression_mode - // If the number of rows that are processed for a single query is below the // threshold, it will be executed on the coordinator only with codegen disabled 31: optional i32 exec_single_node_rows_threshold = 100 http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 529af04..665144f 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -72,8 +72,7 @@ enum TImpalaQueryOptions { // Number of scanner threads. NUM_SCANNER_THREADS, - // If true, Impala will try to execute on file formats that are not fully supported yet - ALLOW_UNSUPPORTED_FORMATS, + ALLOW_UNSUPPORTED_FORMATS, // Removed DEFAULT_ORDER_BY_LIMIT, // Removed @@ -110,8 +109,7 @@ enum TImpalaQueryOptions { // Leave blank to use default. COMPRESSION_CODEC, - // Mode for compressing sequence files; either BLOCK, RECORD, or DEFAULT - SEQ_COMPRESSION_MODE, + SEQ_COMPRESSION_MODE, // Removed // HBase scan query option. If set and > 0, HBASE_CACHING is the value for // "hbase.client.Scan.setCaching()" when querying HBase table. Otherwise, use backend http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 54ad57f..b671a1e 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -390,7 +390,6 @@ public class PlannerTestBase extends FrontendTestBase { protected TQueryOptions defaultQueryOptions() { TQueryOptions options = new TQueryOptions(); options.setExplain_level(TExplainLevel.STANDARD); - options.setAllow_unsupported_formats(true); options.setExec_single_node_rows_threshold(0); return options; } http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/bad_avro_snap/README ---------------------------------------------------------------------- diff --git a/testdata/bad_avro_snap/README b/testdata/bad_avro_snap/README index 6271967..71eb398 100644 --- a/testdata/bad_avro_snap/README +++ b/testdata/bad_avro_snap/README @@ -1,6 +1,6 @@ String Data ----------- -Created by modifying Impala's HdfsAvroTableWriter. +Created by modifying Impala's HdfsAvroTableWriter(removed). These files' schemas have a single nullable string column 's'. @@ -14,7 +14,7 @@ truncated_string.avro: contains one value, which is missing the last byte. Float Data ---------- -Created by modifying Impala's HdfsAvroTableWriter. +Created by modifying Impala's HdfsAvroTableWriter(removed). These files' schemas have a single nullable float column 'c1'. http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test b/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test deleted file mode 100644 index 6dc0899..0000000 --- a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test +++ /dev/null @@ -1,43 +0,0 @@ -==== ----- QUERY -drop table if exists __avro_write; -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -create table __avro_write (i int, s string, d double) -stored as AVRO -TBLPROPERTIES ('avro.schema.literal'='{ - "name": "my_record", - "type": "record", - "fields": [ - {"name":"i", "type":["int", "null"]}, - {"name":"s", "type":["string", "null"]}, - {"name":"d", "type":["double", "null"]}]}'); -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __avro_write select 0, "a", 1.1; -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __avro_write select 1, "b", 2.2; -==== ----- QUERY -select * from __avro_write; ----- RESULTS -0,'a',1.1 -1,'b',2.2 ----- TYPES -INT,STRING,DOUBLE -==== ----- QUERY -SET ALLOW_UNSUPPORTED_FORMATS=0; -insert into __avro_write select 1, "b", 2.2; ----- CATCH -Writing to table format AVRO is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS -==== ----- QUERY -drop table __avro_write; -==== http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test deleted file mode 100644 index 7e2363f..0000000 --- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test +++ /dev/null @@ -1,308 +0,0 @@ -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -SET ALLOW_UNSUPPORTED_FORMATS=1; -SET SEQ_COMPRESSION_MODE=BLOCK; -create table __seq_write (i int, s string, d double) -stored as SEQUENCEFILE; -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write select 0, "a", 1.1; -==== ----- QUERY -SET COMPRESSION_CODEC=DEFAULT; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (1, "b", 2.2); -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (2, "c", 3.3); -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY_BLOCKED; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (3, "d", 4.4); -==== ----- QUERY -SET COMPRESSION_CODEC=GZIP; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (4, "e", 5.5); -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write select 5, "a", 1.1; -==== ----- QUERY -SET COMPRESSION_CODEC=DEFAULT; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (6, "b", 2.2); -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (7, "c", 3.3); -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY_BLOCKED; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (8, "d", 4.4); -==== ----- QUERY -SET COMPRESSION_CODEC=GZIP; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __seq_write values (9, "e", 5.5); -==== ----- QUERY -SET ALLOW_UNSUPPORTED_FORMATS=0; -insert into __seq_write values (4, "e", 5.5); ----- CATCH -Writing to table format SEQUENCE_FILE is not supported. Use query option -==== ----- QUERY -select * from __seq_write; ----- RESULTS -0,'a',1.1 -1,'b',2.2 -2,'c',3.3 -3,'d',4.4 -4,'e',5.5 -5,'a',1.1 -6,'b',2.2 -7,'c',3.3 -8,'d',4.4 -9,'e',5.5 ----- TYPES -INT,STRING,DOUBLE -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read -# it back -SET COMPRESSION_CODEC=NONE; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_none_rec like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_none_rec partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_none_rec; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then -# read it back -SET COMPRESSION_CODEC=DEFAULT; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_def_rec like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_def_rec partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_def_rec; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and -# then read it back -SET COMPRESSION_CODEC=SNAPPY_BLOCKED; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_snapb_rec; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read -# it back -SET COMPRESSION_CODEC=SNAPPY; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_snap_rec like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_snap_rec partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_snap_rec; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read -# it back -SET COMPRESSION_CODEC=GZIP; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_gzip_rec; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it -# back -SET COMPRESSION_CODEC=NONE; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_none_block like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_none_block partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_none_block; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read -# it back -SET COMPRESSION_CODEC=DEFAULT; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_def_block like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_def_block partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_def_block; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and -# then read it back -SET COMPRESSION_CODEC=SNAPPY_BLOCKED; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_snapb_block like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_snapb_block partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_snapb_block; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read -# it back -SET COMPRESSION_CODEC=SNAPPY; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_snap_block like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_snap_block partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_snap_block; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it -# back -SET COMPRESSION_CODEC=GZIP; -SET SEQ_COMPRESSION_MODE=BLOCK; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table store_sales_seq_gzip_block like tpcds_parquet.store_sales -stored as SEQUENCEFILE; -insert into store_sales_seq_gzip_block partition(ss_sold_date_sk) -select * from tpcds_parquet.store_sales -where (ss_sold_date_sk between 2451175 and 2451200) or - (ss_sold_date_sk is null and ss_sold_time_sk > 60000); -==== ----- QUERY -select count(*) from store_sales_seq_gzip_block; ----- RESULTS -60091 ----- TYPES -BIGINT -==== ----- QUERY -# IMPALA-5407: Create a table containing seq files with GZIP+RECORD. If the number of -# impalad workers is three, three files will be created, two of which are large enough -# (> 64MB) to force multiple flushes. Make sure that the files have been created -# successfully. -SET COMPRESSION_CODEC=GZIP; -SET SEQ_COMPRESSION_MODE=RECORD; -SET ALLOW_UNSUPPORTED_FORMATS=1; -create table catalog_sales_seq_gzip_rec like tpcds.catalog_sales stored as SEQUENCEFILE; -insert into catalog_sales_seq_gzip_rec select * from tpcds.catalog_sales; -==== ----- QUERY -select count(*) from catalog_sales_seq_gzip_rec; ----- RESULTS -1441548 ----- TYPES -BIGINT -==== http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/set.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test index 5a2c56a..ffb53a1 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/set.test +++ b/testdata/workloads/functional-query/queries/QueryTest/set.test @@ -8,7 +8,6 @@ set buffer_pool_limit=7; set all; ---- RESULTS: VERIFY_IS_SUBSET 'ABORT_ON_ERROR','0','REGULAR' -'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED' 'BATCH_SIZE','0','DEVELOPMENT' 'BUFFER_POOL_LIMIT','','ADVANCED' 'DEBUG_ACTION','','DEVELOPMENT' @@ -34,7 +33,6 @@ set explain_level=3; set all; ---- RESULTS: VERIFY_IS_SUBSET 'ABORT_ON_ERROR','0','REGULAR' -'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED' 'BATCH_SIZE','0','DEVELOPMENT' 'BUFFER_POOL_LIMIT','','ADVANCED' 'DEBUG_ACTION','','DEVELOPMENT' @@ -60,7 +58,6 @@ set explain_level='0'; set all; ---- RESULTS: VERIFY_IS_SUBSET 'ABORT_ON_ERROR','0','REGULAR' -'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED' 'BATCH_SIZE','0','DEVELOPMENT' 'BUFFER_POOL_LIMIT','','ADVANCED' 'DEBUG_ACTION','','DEVELOPMENT' http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/text-writer.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test b/testdata/workloads/functional-query/queries/QueryTest/text-writer.test deleted file mode 100644 index 89cd730..0000000 --- a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test +++ /dev/null @@ -1,47 +0,0 @@ -==== ----- QUERY -drop table if exists __text_write; -==== ----- QUERY -create table __text_write (i int, s string, d double); -==== ----- QUERY -SET COMPRESSION_CODEC=NONE; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __text_write select 0, "a", 1.1; -==== ----- QUERY -SET COMPRESSION_CODEC=DEFAULT; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __text_write values (1, "b", 2.2); -==== ----- QUERY -SET COMPRESSION_CODEC=SNAPPY; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __text_write values (2, "c", 3.3); -==== ----- QUERY -SET COMPRESSION_CODEC=GZIP; -SET ALLOW_UNSUPPORTED_FORMATS=1; -insert into __text_write values (3, "d", 4.4); -==== ----- QUERY -SET COMPRESSION_CODEC=GZIP; -SET ALLOW_UNSUPPORTED_FORMATS=0; -insert into __text_write values (3, "d", 4.4); ----- CATCH -Writing to compressed text table is not supported. -==== ----- QUERY -select * from __text_write; ----- RESULTS -0,'a',1.1 -1,'b',2.2 -2,'c',3.3 -3,'d',4.4 ----- TYPES -INT,STRING,DOUBLE -==== ----- QUERY -drop table __text_write; -==== http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test new file mode 100644 index 0000000..68f355f --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test @@ -0,0 +1,77 @@ +==== +---- QUERY +create table __text_write (i int, s string, d double); +==== +---- QUERY +SET COMPRESSION_CODEC=NONE; +insert into __text_write select 0, "a", 1.1; +==== +---- QUERY +SET COMPRESSION_CODEC=GZIP; +insert into __text_write values (3, "d", 4.4); +---- CATCH +Writing to compressed text table is not supported. +==== +---- QUERY +select * from __text_write; +---- RESULTS +0,'a',1.1 +---- TYPES +INT,STRING,DOUBLE +==== +---- QUERY +create table __avro_write (i int, s string, d double) +stored as AVRO +TBLPROPERTIES ('avro.schema.literal'='{ + "name": "my_record", + "type": "record", + "fields": [ + {"name":"i", "type":["int", "null"]}, + {"name":"s", "type":["string", "null"]}, + {"name":"d", "type":["double", "null"]}]}'); +==== +---- QUERY +insert into __avro_write select 1, "b", 2.2; +---- CATCH +Writing to table format AVRO is not supported. +==== +---- QUERY +create table __seq_write (i int, s string, d double) +stored as SEQUENCEFILE; +==== +---- QUERY +insert into __seq_write values (4, "e", 5.5); +---- CATCH +Writing to table format SEQUENCE_FILE is not supported. +==== +---- QUERY +# Test writing to mixed format table containing partitions in both supported and +# unsupported formats where writing to the partition with supported format should succeed. +# Create a table containing both text(supported) and avro(unsupported) partitions. +create table __mixed_format_write (id int) partitioned by (part int); +==== +---- QUERY +insert into __mixed_format_write partition(part=2000) values(1); +==== +---- QUERY +insert into __mixed_format_write partition(part=2001) values(2); +==== +---- QUERY +alter table __mixed_format_write partition (part=2001) set fileformat AVRO; +==== +---- QUERY +insert into __mixed_format_write partition(part=2000) values(3); +==== +---- QUERY +insert into __mixed_format_write partition(part=2001) values(4); +---- CATCH +Writing to table format AVRO is not supported. +==== +---- QUERY +select id, part from __mixed_format_write where part = 2000; +---- RESULTS +1,2000 +3,2000 +---- TYPES +INT,INT +==== http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/common/test_dimensions.py ---------------------------------------------------------------------- diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py index 785cfa9..0460ea7 100644 --- a/tests/common/test_dimensions.py +++ b/tests/common/test_dimensions.py @@ -108,19 +108,6 @@ def create_parquet_dimension(workload): return ImpalaTestDimension('table_format', TableFormatInfo.create_from_string(dataset, 'parquet/none')) -# Available Exec Options: -#01: abort_on_error (bool) -#02 max_errors (i32) -#03: disable_codegen (bool) -#04: batch_size (i32) -#05: return_as_ascii (bool) -#06: num_nodes (i32) -#07: max_scan_range_length (i64) -#08: num_scanner_threads (i32) -#09: max_io_buffers (i32) -#10: allow_unsupported_formats (bool) -#11: partition_agg (bool) - # Common sets of values for the exec option vectors ALL_BATCH_SIZES = [0] http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/hs2/test_hs2.py ---------------------------------------------------------------------- diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py index cd861e9..795f45c 100644 --- a/tests/hs2/test_hs2.py +++ b/tests/hs2/test_hs2.py @@ -89,11 +89,10 @@ class TestHS2(HS2TestSuite): # Should be unchanged assert vals2["SYNC_DDL"] == "0" - # Verify that 'DEVELOPMENT' and 'DEPRECATED' options are not returned. assert "MAX_ERRORS" in vals2 assert levels["MAX_ERRORS"] == "ADVANCED" + # Verify that 'DEVELOPMENT' options are not returned. assert "DEBUG_ACTION" not in vals2 - assert "ALLOW_UNSUPPORTED_FORMATS" not in vals2 # Removed options should not be returned. assert "MAX_IO_BUFFERS" not in vals2 @@ -101,7 +100,8 @@ class TestHS2(HS2TestSuite): @needs_session() def test_session_option_levels_via_set_all(self): """ - Tests the level of session options returned by a SET ALL query. + Tests the level of session options returned by a SET ALL query except DEPRECATED as we + currently do not have any of those left. """ vals, levels = self.get_session_options("SET ALL") @@ -109,12 +109,10 @@ class TestHS2(HS2TestSuite): assert "SYNC_DDL" in vals assert "MAX_ERRORS" in vals assert "DEBUG_ACTION" in vals - assert "ALLOW_UNSUPPORTED_FORMATS" in vals assert levels["COMPRESSION_CODEC"] == "REGULAR" assert levels["SYNC_DDL"] == "REGULAR" assert levels["MAX_ERRORS"] == "ADVANCED" assert levels["DEBUG_ACTION"] == "DEVELOPMENT" - assert levels["ALLOW_UNSUPPORTED_FORMATS"] == "DEPRECATED" # Removed options should not be returned. assert "MAX_IO_BUFFERS" not in vals http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/metadata/test_partition_metadata.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index 1d77aa5..d23e3f0 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -45,10 +45,7 @@ class TestPartitionMetadata(ImpalaTestSuite): # compression codecs. cls.ImpalaTestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format in ('text', 'parquet') and - v.get_value('table_format').compression_codec == 'none') or - (v.get_value('table_format').file_format in ('seq', 'avro') and - v.get_value('table_format').compression_codec == 'snap' and - v.get_value('table_format').compression_type == 'block')) + v.get_value('table_format').compression_codec == 'none')) @SkipIfLocal.hdfs_client # TODO: this dependency might not exist anymore def test_multiple_partitions_same_location(self, vector, unique_database): @@ -70,9 +67,6 @@ class TestPartitionMetadata(ImpalaTestSuite): self.client.execute("alter table %s add partition (j=2) location '%s/p'" % (FQ_TBL_NAME, TBL_LOCATION)) - # Allow unsupported avro and sequence file writer. - self.client.execute("set allow_unsupported_formats=true") - # Insert some data. This will only update partition j=1 (IMPALA-1480). self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME) # Refresh to update file metadata of both partitions @@ -80,31 +74,19 @@ class TestPartitionMetadata(ImpalaTestSuite): # The data will be read twice because each partition points to the same location. data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) - if file_format == 'avro': - # Avro writer is broken and produces nulls. Only check partition column. - assert data.split('\t')[1] == '3' - else: - assert data.split('\t') == ['2', '3'] + assert data.split('\t') == ['2', '3'] self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME) self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME) self.client.execute("refresh %s" % FQ_TBL_NAME) data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) - if file_format == 'avro': - # Avro writer is broken and produces nulls. Only check partition column. - assert data.split('\t')[1] == '9' - else: - assert data.split('\t') == ['6', '9'] + assert data.split('\t') == ['6', '9'] # Force all scan ranges to be on the same node. It should produce the same # result as above. See IMPALA-5412. self.client.execute("set num_nodes=1") data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) - if file_format == 'avro': - # Avro writer is broken and produces nulls. Only check partition column. - assert data.split('\t')[1] == '9' - else: - assert data.split('\t') == ['6', '9'] + assert data.split('\t') == ['6', '9'] @SkipIfS3.hive @SkipIfADLS.hive http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/query_test/test_compressed_formats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index 694cfe9..2896632 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -129,71 +129,25 @@ class TestCompressedFormats(ImpalaTestSuite): finally: call(["hive", "-e", drop_cmd]); -class TestTableWriters(ImpalaTestSuite): +class TestUnsupportedTableWriters(ImpalaTestSuite): @classmethod def get_workload(cls): return 'functional-query' @classmethod def add_test_dimensions(cls): - super(TestTableWriters, cls).add_test_dimensions() + super(TestUnsupportedTableWriters, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) - # This class tests many formats, but doesn't use the contraints - # Each format is tested within one test file, we constrain to text/none - # as each test file only needs to be run once. + # This class tests different formats, but doesn't use constraints. + # The constraint added below is only to make sure that the test file runs once. cls.ImpalaTestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format =='text' and v.get_value('table_format').compression_codec == 'none')) - def test_seq_writer(self, vector, unique_database): - self.run_test_case('QueryTest/seq-writer', vector, unique_database) - - @SkipIfS3.hive - @SkipIfADLS.hive - @SkipIfIsilon.hive - @SkipIfLocal.hive - def test_seq_writer_hive_compatibility(self, vector, unique_database): - self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1') - # Write sequence files with different compression codec/compression mode and then read - # it back in Impala and Hive. - # Note that we don't test snappy here as the snappy codec used by Impala does not seem - # to be fully compatible with the snappy codec used by Hive. - for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'), - ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'), - ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]: - table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode) - self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec) - self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode) - self.client.execute('create table %s like functional.zipcode_incomes stored as ' - 'sequencefile' % table_name) - # Write sequence file of size greater than 4K - self.client.execute('insert into %s select * from functional.zipcode_incomes where ' - 'zip >= "5"' % table_name) - # Write sequence file of size less than 4K - self.client.execute('insert into %s select * from functional.zipcode_incomes where ' - 'zip="00601"' % table_name) - - count_query = 'select count(*) from %s' % table_name - - # Read it back in Impala - output = self.client.execute(count_query) - assert '16541' == output.get_data() - # Read it back in Hive - # Note that username is passed in for the sake of remote cluster tests. The default - # HDFS user is typically 'hdfs', and this is needed to run a count() operation using - # hive. For local mini clusters, the usename can be anything. See IMPALA-5413. - output = self.run_stmt_in_hive(count_query, username='hdfs') - assert '16541' == output.split('\n')[1] - - def test_avro_writer(self, vector): - self.run_test_case('QueryTest/avro-writer', vector) - - def test_text_writer(self, vector): - # TODO debug this test. - # This caused by a zlib failure. Suspected cause is too small a buffer - # passed to zlib for compression; similar to IMPALA-424 - pytest.skip() - self.run_test_case('QueryTest/text-writer', vector) + def test_error_message(self, vector, unique_database): + # Tests that an appropriate error message is displayed for unsupported writers like + # compressed text, avro and sequence. + self.run_test_case('QueryTest/unsupported-writers', vector, unique_database) @pytest.mark.execute_serially class TestLargeCompressedFile(ImpalaTestSuite): http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/shell/test_shell_interactive.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py index fe631cf..eac9d27 100755 --- a/tests/shell/test_shell_interactive.py +++ b/tests/shell/test_shell_interactive.py @@ -389,11 +389,10 @@ class TestImpalaShellInteractive(object): assert "APPX_COUNT_DISTINCT" in result.stdout assert "SUPPORT_START_OVER" in result.stdout # Development, deprecated and removed options should not be shown. + # Note: there are currently no deprecated options assert "Development Query Options:" not in result.stdout - assert "DEBUG_ACTION" not in result.stdout - assert "Deprecated Query Options:" not in result.stdout - assert "ALLOW_UNSUPPORTED_FORMATS" not in result.stdout - assert "MAX_IO_BUFFERS" not in result.stdout + assert "DEBUG_ACTION" not in result.stdout # Development option. + assert "MAX_IO_BUFFERS" not in result.stdout # Removed option. shell2 = ImpalaShell() shell2.send_cmd("set all") @@ -401,7 +400,7 @@ class TestImpalaShellInteractive(object): assert "Query options (defaults shown in []):" in result.stdout assert "Advanced Query Options:" in result.stdout assert "Development Query Options:" in result.stdout - assert "Deprecated Query Options:" in result.stdout + assert "Deprecated Query Options:" not in result.stdout advanced_part_start_idx = result.stdout.find("Advanced Query Options") development_part_start_idx = result.stdout.find("Development Query Options") deprecated_part_start_idx = result.stdout.find("Deprecated Query Options") @@ -411,7 +410,6 @@ class TestImpalaShellInteractive(object): assert "APPX_COUNT_DISTINCT" in advanced_part assert "SUPPORT_START_OVER" in advanced_part assert "DEBUG_ACTION" in development_part - assert "ALLOW_UNSUPPORTED_FORMATS" in result.stdout[deprecated_part_start_idx:] # Removed options should not be shown. assert "MAX_IO_BUFFERS" not in result.stdout
