IMPALA-1427: Improvements to "Unknown disk-ID" warning - Removes the runtime unknown disk ID reporting and instead moves it to the explain plan as a counter that prints the number of scan ranges missing disk IDs in the corresponding HDFS scan nodes.
- Adds a warning to the header of query profile/explain plan with a list of tables missing disk ids. - Removes reference to enabling dfs block metadata configuration, since it doesn't apply anymore. - Removes VolumeId terminology from the runtime profile. Change-Id: Iddb132ff7ad66f3291b93bf9d8061bd0525ef1b2 Reviewed-on: http://gerrit.cloudera.org:8080/5828 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fcc2d817 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fcc2d817 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fcc2d817 Branch: refs/heads/master Commit: fcc2d817b857fd0d57eabe2b5af89631e58c8186 Parents: d074f71 Author: Bharath Vissapragada <[email protected]> Authored: Mon Jan 30 10:37:41 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Feb 8 02:02:15 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node-base.cc | 16 +- be/src/exec/hdfs-scan-node-base.h | 4 - be/src/service/query-exec-state.cc | 13 ++ common/thrift/ImpalaInternalService.thrift | 3 + common/thrift/generate_error_codes.py | 154 +++++++++---------- .../org/apache/impala/planner/HdfsScanNode.java | 51 +++++- .../java/org/apache/impala/planner/Planner.java | 10 ++ .../org/apache/impala/service/Frontend.java | 17 +- 8 files changed, 166 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 7b00fcc..3a5719f 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -59,9 +59,9 @@ DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " "that a scan node will wait for expected runtime filters to arrive."); -DEFINE_bool(suppress_unknown_disk_id_warnings, false, - "Suppress unknown disk id warnings generated when the HDFS implementation does not" - " provide volume/disk information."); + +// TODO: Remove this flag in a compatibility-breaking release. +DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated."); #ifndef NDEBUG DECLARE_bool(skip_file_runtime_filtering); @@ -89,7 +89,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, reader_context_(NULL), tuple_desc_(NULL), hdfs_table_(NULL), - unknown_disk_id_warned_(false), initial_ranges_issued_(false), counters_running_(false), max_compressed_text_file_length_(NULL), @@ -263,14 +262,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { } bool expected_local = params.__isset.is_remote && !params.is_remote; - if (expected_local && params.volume_id == -1) { - if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) { - runtime_profile()->AppendExecOption("Missing Volume Id"); - runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK)); - unknown_disk_id_warned_ = true; - } - ++num_ranges_missing_volume_id; - } + if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id; bool try_cache = params.is_cached; if (runtime_state_->query_options().disable_cached_reads) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 785674c..6313f8b 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -305,10 +305,6 @@ class HdfsScanNodeBase : public ScanNode { /// The root of the table's Avro schema, if we're scanning an Avro table. ScopedAvroSchemaElement avro_schema_; - /// If true, the warning that some disk ids are unknown was logged. Only log this once - /// per scan node since it can be noisy. - bool unknown_disk_id_warned_; - /// Partitions scanned by this scan node. std::unordered_set<int64_t> partition_ids_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index fb23e21..2cf2e5d 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -60,6 +60,7 @@ static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores"; static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; +static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids"; ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server, @@ -428,6 +429,18 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); } + if (query_exec_request.query_ctx.__isset.tables_missing_diskids && + !query_exec_request.query_ctx.tables_missing_diskids.empty()) { + stringstream ss; + const vector<TTableName>& tbls = + query_exec_request.query_ctx.tables_missing_diskids; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); + } + { lock_guard<mutex> l(lock_); // Don't start executing the query if Cancel() was called concurrently with Exec(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index e9b962e..63cb11a 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -322,6 +322,9 @@ struct TQueryCtx { // backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to // avoid the overhead of codegen and can be ignored if codegen is needed functionally. 14: optional bool disable_codegen_hint = false; + + // List of tables with scan ranges that map to blocks with missing disk IDs. + 15: optional list<CatalogObjects.TTableName> tables_missing_diskids } // Context to collect information, which is shared among all instances of that plan http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index b1ff4c8..2e45f2b 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -94,220 +94,216 @@ error_codes = ( ("SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT", 25, "Snappy: Decompressed size is not correct."), - ("HDFS_SCAN_NODE_UNKNOWN_DISK", 26, "Unknown disk id. " - "This will negatively affect performance. " - "Check your hdfs settings to enable block location metadata."), - - ("FRAGMENT_EXECUTOR", 27, "Reserved resource size ($0) is larger than " + ("FRAGMENT_EXECUTOR", 26, "Reserved resource size ($0) is larger than " "query mem limit ($1), and will be restricted to $1. Configure the reservation " "size by setting RM_INITIAL_MEM."), - ("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 28, + ("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 27, "Cannot perform join at hash join node with id $0." " The input data was partitioned the maximum number of $1 times." " This could mean there is significant skew in the data or the memory limit is" " set too low."), - ("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 29, + ("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 28, "Cannot perform aggregation at hash aggregation node with id $0." " The input data was partitioned the maximum number of $1 times." " This could mean there is significant skew in the data or the memory limit is" " set too low."), - ("MISSING_BUILTIN", 30, "Builtin '$0' with symbol '$1' does not exist. " + ("MISSING_BUILTIN", 29, "Builtin '$0' with symbol '$1' does not exist. " "Verify that all your impalads are the same version."), - ("RPC_GENERAL_ERROR", 31, "RPC Error: $0"), - ("RPC_RECV_TIMEOUT", 32, "RPC recv timed out: $0"), + ("RPC_GENERAL_ERROR", 30, "RPC Error: $0"), + ("RPC_RECV_TIMEOUT", 31, "RPC recv timed out: $0"), - ("UDF_VERIFY_FAILED", 33, + ("UDF_VERIFY_FAILED", 32, "Failed to verify function $0 from LLVM module $1, see log for more details."), - ("PARQUET_CORRUPT_RLE_BYTES", 34, "File $0 corrupt. RLE level data bytes = $1"), + ("PARQUET_CORRUPT_RLE_BYTES", 33, "File $0 corrupt. RLE level data bytes = $1"), - ("AVRO_DECIMAL_RESOLUTION_ERROR", 35, "Column '$0' has conflicting Avro decimal types. " + ("AVRO_DECIMAL_RESOLUTION_ERROR", 34, "Column '$0' has conflicting Avro decimal types. " "Table schema $1: $2, file schema $1: $3"), - ("AVRO_DECIMAL_METADATA_MISMATCH", 36, "Column '$0' has conflicting Avro decimal types. " + ("AVRO_DECIMAL_METADATA_MISMATCH", 35, "Column '$0' has conflicting Avro decimal types. " "Declared $1: $2, $1 in table's Avro schema: $3"), - ("AVRO_SCHEMA_RESOLUTION_ERROR", 37, "Unresolvable types for column '$0': " + ("AVRO_SCHEMA_RESOLUTION_ERROR", 36, "Unresolvable types for column '$0': " "table type: $1, file type: $2"), - ("AVRO_SCHEMA_METADATA_MISMATCH", 38, "Unresolvable types for column '$0': " + ("AVRO_SCHEMA_METADATA_MISMATCH", 37, "Unresolvable types for column '$0': " "declared column type: $1, table's Avro schema type: $2"), - ("AVRO_UNSUPPORTED_DEFAULT_VALUE", 39, "Field $0 is missing from file and default " + ("AVRO_UNSUPPORTED_DEFAULT_VALUE", 38, "Field $0 is missing from file and default " "values of type $1 are not yet supported."), - ("AVRO_MISSING_FIELD", 40, "Inconsistent table metadata. Mismatch between column " + ("AVRO_MISSING_FIELD", 39, "Inconsistent table metadata. Mismatch between column " "definition and Avro schema: cannot read field $0 because there are only $1 fields."), - ("AVRO_MISSING_DEFAULT", 41, + ("AVRO_MISSING_DEFAULT", 40, "Field $0 is missing from file and does not have a default value."), - ("AVRO_NULLABILITY_MISMATCH", 42, + ("AVRO_NULLABILITY_MISMATCH", 41, "Field $0 is nullable in the file schema but not the table schema."), - ("AVRO_NOT_A_RECORD", 43, + ("AVRO_NOT_A_RECORD", 42, "Inconsistent table metadata. Field $0 is not a record in the Avro schema."), - ("PARQUET_DEF_LEVEL_ERROR", 44, "Could not read definition level, even though metadata" + ("PARQUET_DEF_LEVEL_ERROR", 43, "Could not read definition level, even though metadata" " states there are $0 values remaining in data page. file=$1"), - ("PARQUET_NUM_COL_VALS_ERROR", 45, "Mismatched number of values in column index $0 " + ("PARQUET_NUM_COL_VALS_ERROR", 44, "Mismatched number of values in column index $0 " "($1 vs. $2). file=$3"), - ("PARQUET_DICT_DECODE_FAILURE", 46, "File '$0' is corrupt: error decoding " + ("PARQUET_DICT_DECODE_FAILURE", 45, "File '$0' is corrupt: error decoding " "dictionary-encoded value of type $1 at offset $2"), - ("SSL_PASSWORD_CMD_FAILED", 47, + ("SSL_PASSWORD_CMD_FAILED", 46, "SSL private-key password command ('$0') failed with error: $1"), - ("SSL_CERTIFICATE_PATH_BLANK", 48, "The SSL certificate path is blank"), - ("SSL_PRIVATE_KEY_PATH_BLANK", 49, "The SSL private key path is blank"), + ("SSL_CERTIFICATE_PATH_BLANK", 47, "The SSL certificate path is blank"), + ("SSL_PRIVATE_KEY_PATH_BLANK", 48, "The SSL private key path is blank"), - ("SSL_CERTIFICATE_NOT_FOUND", 50, "The SSL certificate file does not exist at path $0"), - ("SSL_PRIVATE_KEY_NOT_FOUND", 51, "The SSL private key file does not exist at path $0"), + ("SSL_CERTIFICATE_NOT_FOUND", 49, "The SSL certificate file does not exist at path $0"), + ("SSL_PRIVATE_KEY_NOT_FOUND", 50, "The SSL private key file does not exist at path $0"), - ("SSL_SOCKET_CREATION_FAILED", 52, "SSL socket creation failed: $0"), + ("SSL_SOCKET_CREATION_FAILED", 51, "SSL socket creation failed: $0"), - ("MEM_ALLOC_FAILED", 53, "Memory allocation of $0 bytes failed"), + ("MEM_ALLOC_FAILED", 52, "Memory allocation of $0 bytes failed"), - ("PARQUET_REP_LEVEL_ERROR", 54, "Could not read repetition level, even though metadata" + ("PARQUET_REP_LEVEL_ERROR", 53, "Could not read repetition level, even though metadata" " states there are $0 values remaining in data page. file=$1"), - ("PARQUET_UNRECOGNIZED_SCHEMA", 55, "File '$0' has an incompatible Parquet schema for " + ("PARQUET_UNRECOGNIZED_SCHEMA", 54, "File '$0' has an incompatible Parquet schema for " "column '$1'. Column type: $2, Parquet schema:\\n$3"), - ("COLLECTION_ALLOC_FAILED", 56, "Failed to allocate $0 bytes for collection '$1'.\\n" + ("COLLECTION_ALLOC_FAILED", 55, "Failed to allocate $0 bytes for collection '$1'.\\n" "Current buffer size: $2 num tuples: $3."), - ("TMP_DEVICE_BLACKLISTED", 57, + ("TMP_DEVICE_BLACKLISTED", 56, "Temporary device for directory $0 is blacklisted from a previous error and cannot " "be used."), - ("TMP_FILE_BLACKLISTED", 58, + ("TMP_FILE_BLACKLISTED", 57, "Temporary file $0 is blacklisted from a previous error and cannot be expanded."), - ("RPC_CLIENT_CONNECT_FAILURE", 59, + ("RPC_CLIENT_CONNECT_FAILURE", 58, "RPC client failed to connect: $0"), - ("STALE_METADATA_FILE_TOO_SHORT", 60, "Metadata for file '$0' appears stale. " + ("STALE_METADATA_FILE_TOO_SHORT", 59, "Metadata for file '$0' appears stale. " "Try running \\\"refresh $1\\\" to reload the file metadata."), - ("PARQUET_BAD_VERSION_NUMBER", 61, "File '$0' has an invalid version number: $1\\n" + ("PARQUET_BAD_VERSION_NUMBER", 60, "File '$0' has an invalid version number: $1\\n" "This could be due to stale metadata. Try running \\\"refresh $2\\\"."), - ("SCANNER_INCOMPLETE_READ", 62, "Tried to read $0 bytes but could only read $1 bytes. " + ("SCANNER_INCOMPLETE_READ", 61, "Tried to read $0 bytes but could only read $1 bytes. " "This may indicate data file corruption. (file $2, byte offset: $3)"), - ("SCANNER_INVALID_READ", 63, "Invalid read of $0 bytes. This may indicate data file " + ("SCANNER_INVALID_READ", 62, "Invalid read of $0 bytes. This may indicate data file " "corruption. (file $1, byte offset: $2)"), - ("AVRO_BAD_VERSION_HEADER", 64, "File '$0' has an invalid version header: $1\\n" + ("AVRO_BAD_VERSION_HEADER", 63, "File '$0' has an invalid version header: $1\\n" "Make sure the file is an Avro data file."), - ("UDF_MEM_LIMIT_EXCEEDED", 65, "$0's allocations exceeded memory limits."), + ("UDF_MEM_LIMIT_EXCEEDED", 64, "$0's allocations exceeded memory limits."), - ("BTS_BLOCK_OVERFLOW", 66, "Cannot process row that is bigger than the IO size " + ("BTS_BLOCK_OVERFLOW", 65, "Cannot process row that is bigger than the IO size " "(row_size=$0, null_indicators_size=$1). To run this query, increase the IO size " "(--read_size option)."), - ("COMPRESSED_FILE_MULTIPLE_BLOCKS", 67, + ("COMPRESSED_FILE_MULTIPLE_BLOCKS", 66, "For better performance, snappy-, gzip-, and bzip-compressed files " "should not be split into multiple HDFS blocks. file=$0 offset $1"), - ("COMPRESSED_FILE_BLOCK_CORRUPTED", 68, + ("COMPRESSED_FILE_BLOCK_CORRUPTED", 67, "$0 Data error, likely data corrupted in this block."), - ("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 69, "$0 Decompressor error at $1, code=$2"), + ("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 68, "$0 Decompressor error at $1, code=$2"), - ("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 70, + ("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 69, "Decompression failed to make progress, but end of input is not reached. " "File appears corrupted. file=$0"), - ("COMPRESSED_FILE_TRUNCATED", 71, + ("COMPRESSED_FILE_TRUNCATED", 70, "Unexpected end of compressed file. File may be truncated. file=$0"), - ("DATASTREAM_SENDER_TIMEOUT", 72, "Sender timed out waiting for receiver fragment " + ("DATASTREAM_SENDER_TIMEOUT", 71, "Sender timed out waiting for receiver fragment " "instance: $0"), - ("KUDU_IMPALA_TYPE_MISSING", 73, "Kudu type $0 is not available in Impala."), + ("KUDU_IMPALA_TYPE_MISSING", 72, "Kudu type $0 is not available in Impala."), - ("IMPALA_KUDU_TYPE_MISSING", 74, "Impala type $0 is not available in Kudu."), + ("IMPALA_KUDU_TYPE_MISSING", 73, "Impala type $0 is not available in Kudu."), - ("KUDU_NOT_SUPPORTED_ON_OS", 75, "Kudu is not supported on this operating system."), + ("KUDU_NOT_SUPPORTED_ON_OS", 74, "Kudu is not supported on this operating system."), - ("KUDU_NOT_ENABLED", 76, "Kudu features are disabled by the startup flag " + ("KUDU_NOT_ENABLED", 75, "Kudu features are disabled by the startup flag " "--disable_kudu."), - ("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 77, "Cannot perform hash join at node with " + ("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 76, "Cannot perform hash join at node with " "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " "level $1. Number of rows $2."), - ("PARTITIONED_AGG_REPARTITION_FAILS", 78, "Cannot perform aggregation at node with " + ("PARTITIONED_AGG_REPARTITION_FAILS", 77, "Cannot perform aggregation at node with " "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " "level $1. Number of rows $2."), - ("AVRO_TRUNCATED_BLOCK", 79, "File '$0' is corrupt: truncated data block at offset $1"), + ("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"), - ("AVRO_INVALID_UNION", 80, "File '$0' is corrupt: invalid union value $1 at offset $2"), + ("AVRO_INVALID_UNION", 79, "File '$0' is corrupt: invalid union value $1 at offset $2"), - ("AVRO_INVALID_BOOLEAN", 81, "File '$0' is corrupt: invalid boolean value $1 at offset " + ("AVRO_INVALID_BOOLEAN", 80, "File '$0' is corrupt: invalid boolean value $1 at offset " "$2"), - ("AVRO_INVALID_LENGTH", 82, "File '$0' is corrupt: invalid length $1 at offset $2"), + ("AVRO_INVALID_LENGTH", 81, "File '$0' is corrupt: invalid length $1 at offset $2"), - ("SCANNER_INVALID_INT", 83, "File '$0' is corrupt: invalid encoded integer at offset $1"), + ("SCANNER_INVALID_INT", 82, "File '$0' is corrupt: invalid encoded integer at offset $1"), - ("AVRO_INVALID_RECORD_COUNT", 84, "File '$0' is corrupt: invalid record count $1 at " + ("AVRO_INVALID_RECORD_COUNT", 83, "File '$0' is corrupt: invalid record count $1 at " "offset $2"), - ("AVRO_INVALID_COMPRESSED_SIZE", 85, "File '$0' is corrupt: invalid compressed block " + ("AVRO_INVALID_COMPRESSED_SIZE", 84, "File '$0' is corrupt: invalid compressed block " "size $1 at offset $2"), - ("AVRO_INVALID_METADATA_COUNT", 86, "File '$0' is corrupt: invalid metadata count $1 " + ("AVRO_INVALID_METADATA_COUNT", 85, "File '$0' is corrupt: invalid metadata count $1 " "at offset $2"), - ("SCANNER_STRING_LENGTH_OVERFLOW", 87, "File '$0' could not be read: string $1 was " + ("SCANNER_STRING_LENGTH_OVERFLOW", 86, "File '$0' could not be read: string $1 was " "longer than supported limit of $2 bytes at offset $3"), - ("PARQUET_CORRUPT_PLAIN_VALUE", 88, "File '$0' is corrupt: error decoding value of type " + ("PARQUET_CORRUPT_PLAIN_VALUE", 87, "File '$0' is corrupt: error decoding value of type " "$1 at offset $2"), - ("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for " + ("PARQUET_CORRUPT_DICTIONARY", 88, "File '$0' is corrupt: error reading dictionary for " "data of type $1: $2"), - ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum " + ("TEXT_PARSER_TRUNCATED_COLUMN", 89, "Length of column is $0 which exceeds maximum " "supported length of 2147483647 bytes."), - ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query " + ("SCRATCH_LIMIT_EXCEEDED", 90, "Scratch space limit of $0 bytes exceeded for query " "while spilling data to disk."), - ("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."), + ("BUFFER_ALLOCATION_FAILED", 91, "Unexpected error allocating $0 byte buffer."), - ("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 93, "File '$0' is corrupt: metadata indicates " + ("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 92, "File '$0' is corrupt: metadata indicates " "a zero row count but there is at least one non-empty row group."), - ("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends " + ("NO_REGISTERED_BACKENDS", 93, "Cannot schedule query: no registered backends " "available."), - ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."), + ("KUDU_KEY_ALREADY_PRESENT", 94, "Key already present in Kudu table '$0'."), - ("KUDU_NOT_FOUND", 96, "Not found in Kudu table '$0': $1"), + ("KUDU_NOT_FOUND", 95, "Not found in Kudu table '$0': $1"), - ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1"), + ("KUDU_SESSION_ERROR", 96, "Error in Kudu table '$0': $1"), - ("AVRO_UNSUPPORTED_TYPE", 98, "Column '$0': unsupported Avro type '$1'"), + ("AVRO_UNSUPPORTED_TYPE", 97, "Column '$0': unsupported Avro type '$1'"), - ("AVRO_INVALID_DECIMAL", 99, + ("AVRO_INVALID_DECIMAL", 98, "Column '$0': invalid Avro decimal type with precision = '$1' scale = '$2'"), - ("KUDU_NULL_CONSTRAINT_VIOLATION", 100, + ("KUDU_NULL_CONSTRAINT_VIOLATION", 99, "Row with null value violates nullability constraint on table '$0'."), - ("PARQUET_TIMESTAMP_OUT_OF_RANGE", 101, + ("PARQUET_TIMESTAMP_OUT_OF_RANGE", 100, "Parquet file '$0' column '$1' contains an out of range timestamp. " "The valid date range is 1400-01-01..9999-12-31."), ) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 77bc4cc..0aaaa51 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -17,12 +17,16 @@ package org.apache.impala.planner; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.SlotDescriptor; @@ -35,6 +39,8 @@ import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsPartition.FileBlock; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Type; +import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.NotImplementedException; import org.apache.impala.common.PrintUtils; @@ -122,6 +128,14 @@ public class HdfsScanNode extends ScanNode { // to values > 0 for hdfs text files. private int skipHeaderLineCount_ = 0; + // Number of scan-ranges/files/partitions that have missing disk ids. Reported in the + // explain plan. + private int numScanRangesNoDiskIds_ = 0; + private int numFilesNoDiskIds_ = 0; + private int numPartitionsNoDiskIds_ = 0; + + private static final Configuration CONF = new Configuration(); + /** * Construct a node to scan given data files into tuples described by 'desc', * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and @@ -308,7 +322,8 @@ public class HdfsScanNode extends ScanNode { * ids, based on the given maximum number of bytes each scan range should scan. * Returns the set of file formats being scanned. */ - private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) { + private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) + throws ImpalaRuntimeException { long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options() .getMax_scan_range_length(); scanRanges_ = Lists.newArrayList(); @@ -316,7 +331,18 @@ public class HdfsScanNode extends ScanNode { for (HdfsPartition partition: partitions_) { fileFormats.add(partition.getFileFormat()); Preconditions.checkState(partition.getId() >= 0); + // Missing disk id accounting is only done for file systems that support the notion + // of disk/storage ids. + FileSystem partitionFs; + try { + partitionFs = partition.getLocationPath().getFileSystem(CONF); + } catch (IOException e) { + throw new ImpalaRuntimeException("Error determining partition fs type", e); + } + boolean checkMissingDiskIds = FileSystemUtil.supportsStorageIds(partitionFs); + boolean partitionMissingDiskIds = false; for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) { + boolean fileDescMissingDiskIds = false; for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) { HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock); List<Integer> replicaHostIdxs = block.getReplicaHostIdxs(); @@ -337,6 +363,11 @@ public class HdfsScanNode extends ScanNode { // Translate from network address to the global (to this request) host index. Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress); location.setHost_idx(globalHostIdx); + if (checkMissingDiskIds && block.getDiskId(i) == -1) { + ++numScanRangesNoDiskIds_; + partitionMissingDiskIds = true; + fileDescMissingDiskIds = true; + } location.setVolume_id(block.getDiskId(i)); location.setIs_cached(block.isCached(i)); locations.add(location); @@ -362,7 +393,15 @@ public class HdfsScanNode extends ScanNode { currentOffset += currentLength; } } + if (fileDescMissingDiskIds) { + ++numFilesNoDiskIds_; + if (LOG.isTraceEnabled()) { + LOG.trace("File blocks mapping to unknown disk ids. Dir: " + + partition.getLocation() + " File:" + fileDesc.toString()); + } + } } + if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_; } return fileFormats; } @@ -554,13 +593,13 @@ public class HdfsScanNode extends ScanNode { HdfsTable table = (HdfsTable) desc_.getTable(); output.append(String.format("%s%s [%s", prefix, getDisplayLabel(), getDisplayLabelDetail())); + int numPartitions = partitions_.size(); if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() && fragment_.isPartitioned()) { output.append(", " + fragment_.getDataPartition().getExplainString()); } output.append("]\n"); if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - int numPartitions = partitions_.size(); if (tbl_.getNumClusteringCols() == 0) numPartitions = 1; output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix, numPartitions, table.getPartitions().size() - 1, totalFiles_, @@ -586,6 +625,12 @@ public class HdfsScanNode extends ScanNode { if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(getStatsExplainString(detailPrefix, detailLevel)); output.append("\n"); + if (numScanRangesNoDiskIds_ > 0) { + output.append(String.format("%smissing disk ids: " + + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", detailPrefix, + numPartitionsNoDiskIds_, numPartitions, numFilesNoDiskIds_, + totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size())); + } } return output.toString(); } @@ -663,4 +708,6 @@ public class HdfsScanNode extends ScanNode { @Override public boolean hasCorruptTableStats() { return hasCorruptTableStats_; } + + public boolean hasMissingDiskIds() { return numScanRangesNoDiskIds_ > 0; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index b47a7b3..cc8b39b 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -304,6 +304,16 @@ public class Planner { hasHeader = true; } + if (request.query_ctx.isSetTables_missing_diskids()) { + List<String> tableNames = Lists.newArrayList(); + for (TTableName tableName: request.query_ctx.getTables_missing_diskids()) { + tableNames.add(tableName.db_name + "." + tableName.table_name); + } + str.append("WARNING: The following tables have scan ranges with missing " + + "disk id information.\n" + Joiner.on(", ").join(tableNames) + "\n"); + hasHeader = true; + } + if (request.query_ctx.isDisable_spilling()) { str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" + "Reason: Query option disable_unsafe_spills is set, at least one table\n" + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fcc2d817/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 1030111..a67ad0d 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -86,6 +86,7 @@ import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.common.NotImplementedException; +import org.apache.impala.planner.HdfsScanNode; import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.Planner; import org.apache.impala.planner.ScanNode; @@ -947,14 +948,17 @@ public class Frontend { LOG.trace("get scan range locations"); Set<TTableName> tablesMissingStats = Sets.newTreeSet(); Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); + Set<TTableName> tablesWithMissingDiskIds = Sets.newTreeSet(); for (ScanNode scanNode: scanNodes) { result.putToPer_node_scan_ranges( scanNode.getId().asInt(), scanNode.getScanRangeLocations()); - if (scanNode.isTableMissingStats()) { - tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); - } - if (scanNode.hasCorruptTableStats()) { - tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift()); + + TTableName tableName = scanNode.getTupleDesc().getTableName().toThrift(); + if (scanNode.isTableMissingStats()) tablesMissingStats.add(tableName); + if (scanNode.hasCorruptTableStats()) tablesWithCorruptStats.add(tableName); + if (scanNode instanceof HdfsScanNode && + ((HdfsScanNode) scanNode).hasMissingDiskIds()) { + tablesWithMissingDiskIds.add(tableName); } } @@ -964,6 +968,9 @@ public class Frontend { for (TTableName tableName: tablesWithCorruptStats) { queryCtx.addToTables_with_corrupt_stats(tableName); } + for (TTableName tableName: tablesWithMissingDiskIds) { + queryCtx.addToTables_missing_diskids(tableName); + } // Compute resource requirements after scan range locations because the cost // estimates of scan nodes rely on them.
