IMPALA-3441, IMPALA-3659: check for malformed Avro data

This patch adds error checking to the Avro scanner (both the codegen'd
and interepted paths), including out-of-bounds checks and data
validity checks.

I ran a local benchmark using the following queries:
  set num_scanner_threads=1;
  select count(i) from default.avro_bigints_big; # file contains only longs
  select max(l_orderkey) from biglineitem_avro; # file has tpch.lineitem schema

Both benchmark queries see negligable or no performance impact.

This patch adds a new Avro scanner unit test and an end-to-end test
that queries several corrupted files, as well as updates the zig-zag
varlen int unit test.

Change-Id: I801a11c496a128e02c564c2a9c44baa5a97be132
Reviewed-on: http://gerrit.cloudera.org:8080/3072
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/01287a3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/01287a3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/01287a3b

Branch: refs/heads/master
Commit: 01287a3ba909f93a5bbe72081d5b9ec67db70257
Parents: c076f09
Author: Skye Wanderman-Milne <[email protected]>
Authored: Thu Jun 9 15:09:43 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Mon Jun 13 18:32:32 2016 -0700

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/base-sequence-scanner.cc            |  10 +
 be/src/exec/base-sequence-scanner.h             |   3 +
 be/src/exec/hdfs-avro-scanner-ir.cc             | 177 +++++---
 be/src/exec/hdfs-avro-scanner-test.cc           | 401 +++++++++++++++++++
 be/src/exec/hdfs-avro-scanner.cc                | 307 +++++++++-----
 be/src/exec/hdfs-avro-scanner.h                 |  86 ++--
 be/src/exec/hdfs-avro-table-writer.cc           |   3 +-
 be/src/exec/hdfs-scanner.cc                     |  21 +
 be/src/exec/hdfs-scanner.h                      |   3 +
 be/src/exec/read-write-util.cc                  |  67 +++-
 be/src/exec/read-write-util.h                   |  49 ++-
 be/src/exec/scanner-context.cc                  |  15 +-
 be/src/exec/scanner-context.h                   |   1 +
 be/src/exec/scanner-context.inline.h            |  24 +-
 be/src/exec/zigzag-test.cc                      | 107 ++++-
 common/thrift/generate_error_codes.py           |  20 +
 testdata/bad_avro_snap/README                   |  19 +
 testdata/bad_avro_snap/invalid_union.avro       | Bin 0 -> 191 bytes
 testdata/bad_avro_snap/negative_string_len.avro | Bin 0 -> 180 bytes
 testdata/bad_avro_snap/truncated_float.avro     | Bin 0 -> 175 bytes
 testdata/bad_avro_snap/truncated_string.avro    | Bin 0 -> 171 bytes
 .../functional/functional_schema_template.sql   |  20 +
 .../datasets/functional/schema_constraints.csv  |   2 +
 .../queries/DataErrorsTest/avro-errors.test     |  24 ++
 tests/common/test_result_verifier.py            |  13 +-
 tests/data_errors/test_data_errors.py           |  12 +
 27 files changed, 1143 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c3208fc..876fc7e 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -98,3 +98,4 @@ ADD_BE_TEST(row-batch-list-test)
 ADD_BE_TEST(incr-stats-util-test)
 ADD_BE_TEST(kudu-scan-node-test)
 ADD_BE_TEST(kudu-table-sink-test)
+ADD_BE_TEST(hdfs-avro-scanner-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index 4e0ac59..d235b77 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -22,6 +22,7 @@
 #include "runtime/string-search.h"
 #include "util/codec.h"
 #include "util/runtime-profile-counters.h"
+#include "util/test-info.h"
 
 #include "common/names.h"
 
@@ -71,6 +72,15 @@ BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, 
RuntimeState* state
     num_syncs_(0) {
 }
 
+BaseSequenceScanner::BaseSequenceScanner()
+  : HdfsScanner(),
+    header_(NULL),
+    block_start_(0),
+    total_block_size_(0),
+    num_syncs_(0) {
+  DCHECK(TestInfo::is_test());
+}
+
 BaseSequenceScanner::~BaseSequenceScanner() {
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h 
b/be/src/exec/base-sequence-scanner.h
index 555444a..ea7ad78 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -129,6 +129,9 @@ class BaseSequenceScanner : public HdfsScanner {
   /// If true, this scanner object is only for processing the header.
   bool only_parsing_header_;
 
+  /// Unit test constructor
+  BaseSequenceScanner();
+
  private:
   /// Set to true when this scanner has processed all the bytes it is 
responsible
   /// for, i.e., when it reads a sync occurring completely in the next scan

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc 
b/be/src/exec/hdfs-avro-scanner-ir.cc
index 84ca502..6839fe1 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -24,12 +24,16 @@ using namespace strings;
 
 // Functions in this file are cross-compiled to IR with clang.
 
+const int AVRO_FLOAT_SIZE = 4;
+const int AVRO_DOUBLE_SIZE = 8;
+
 int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** 
data,
-    Tuple* tuple, TupleRow* tuple_row) {
+    uint8_t* data_end, Tuple* tuple, TupleRow* tuple_row) {
   int num_to_commit = 0;
   for (int i = 0; i < max_tuples; ++i) {
     InitTuple(template_tuple_, tuple);
-    if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, 
tuple))) {
+    if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, 
data_end,
+        tuple))) {
       return 0;
     }
     tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
@@ -42,105 +46,160 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, 
MemPool* pool, uint8_t** dat
   return num_to_commit;
 }
 
-bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) {
+bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data,
+    uint8_t* data_end, bool* is_null) {
   DCHECK(null_union_position == 0 || null_union_position == 1);
+  if (UNLIKELY(*data == data_end)) {
+    SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK);
+    return false;
+  }
   int8_t union_position = **data;
   // Union position is varlen zig-zag encoded
-  DCHECK(union_position == 0 || union_position == 2);
+  if (UNLIKELY(union_position != 0 && union_position != 2)) {
+    SetStatusInvalidValue(TErrorCode::AVRO_INVALID_UNION, union_position);
+    return false;
+  }
   // "Decode" zig-zag encoding
   if (union_position == 2) union_position = 1;
   *data += 1;
-  return union_position != null_union_position;
+  *is_null = union_position == null_union_position;
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool 
write_slot,
-    void* slot, MemPool* pool) {
+bool HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data,
+    uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) {
+  if (UNLIKELY(*data == data_end)) {
+    SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK);
+    return false;
+  }
   if (write_slot) {
     DCHECK_EQ(type, TYPE_BOOLEAN);
+    if (UNLIKELY(**data != 0 && **data != 1)) {
+      SetStatusInvalidValue(TErrorCode::AVRO_INVALID_BOOLEAN, **data);
+      return false;
+    }
     *reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data);
   }
   *data += 1;
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool 
write_slot,
-    void* slot, MemPool* pool) {
-  int32_t val = ReadWriteUtil::ReadZInt(data);
+bool HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, 
uint8_t* data_end,
+    bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(data, data_end);
+  if (UNLIKELY(!r.ok)) {
+    SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT);
+    return false;
+  }
   if (write_slot) {
     if (type == TYPE_INT) {
-      *reinterpret_cast<int32_t*>(slot) = val;
+      *reinterpret_cast<int32_t*>(slot) = r.val;
     } else if (type == TYPE_BIGINT) {
-      *reinterpret_cast<int64_t*>(slot) = val;
+      *reinterpret_cast<int64_t*>(slot) = r.val;
     } else if (type == TYPE_FLOAT) {
-      *reinterpret_cast<float*>(slot) = val;
-    } else if (type == TYPE_DOUBLE) {
-      *reinterpret_cast<double*>(slot) = val;
+      *reinterpret_cast<float*>(slot) = r.val;
     } else {
-      DCHECK(false);
+      DCHECK_EQ(type, TYPE_DOUBLE);
+      *reinterpret_cast<double*>(slot) = r.val;
     }
   }
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool 
write_slot,
-    void* slot, MemPool* pool) {
-  int64_t val = ReadWriteUtil::ReadZLong(data);
+bool HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, 
uint8_t* data_end,
+    bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(data, data_end);
+  if (UNLIKELY(!r.ok)) {
+    SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT);
+    return false;
+  }
   if (write_slot) {
     if (type == TYPE_BIGINT) {
-      *reinterpret_cast<int64_t*>(slot) = val;
+      *reinterpret_cast<int64_t*>(slot) = r.val;
     } else if (type == TYPE_FLOAT) {
-      *reinterpret_cast<float*>(slot) = val;
-    } else if (type == TYPE_DOUBLE) {
-      *reinterpret_cast<double*>(slot) = val;
+      *reinterpret_cast<float*>(slot) = r.val;
     } else {
-      DCHECK(false);
+      DCHECK_EQ(type, TYPE_DOUBLE);
+      *reinterpret_cast<double*>(slot) = r.val;
     }
   }
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool 
write_slot,
-    void* slot, MemPool* pool) {
+bool HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, 
uint8_t* data_end,
+    bool write_slot, void* slot, MemPool* pool) {
+  if (UNLIKELY(data_end - *data < AVRO_FLOAT_SIZE)) {
+    SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK);
+    return false;
+  }
   if (write_slot) {
     float val = *reinterpret_cast<float*>(*data);
     if (type == TYPE_FLOAT) {
       *reinterpret_cast<float*>(slot) = val;
-    } else if (type == TYPE_DOUBLE) {
-      *reinterpret_cast<double*>(slot) = val;
     } else {
-      DCHECK(false);
+      DCHECK_EQ(type, TYPE_DOUBLE);
+      *reinterpret_cast<double*>(slot) = val;
     }
   }
-  *data += 4;
+  *data += AVRO_FLOAT_SIZE;
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool 
write_slot,
-    void* slot, MemPool* pool) {
+bool HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, 
uint8_t* data_end,
+    bool write_slot, void* slot, MemPool* pool) {
+  if (UNLIKELY(data_end - *data < AVRO_DOUBLE_SIZE)) {
+    SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK);
+    return false;
+  }
   if (write_slot) {
     DCHECK_EQ(type, TYPE_DOUBLE);
     *reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data);
   }
-  *data += 8;
+  *data += AVRO_DOUBLE_SIZE;
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, 
uint8_t** data,
-    bool write_slot, void* slot, MemPool* pool) {
-  int64_t len = ReadWriteUtil::ReadZLong(data);
+ReadWriteUtil::ZLongResult HdfsAvroScanner::ReadFieldLen(uint8_t** data, 
uint8_t* data_end) {
+  ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(data, data_end);
+  if (UNLIKELY(!r.ok)) {
+    SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT);
+    return ReadWriteUtil::ZLongResult::error();
+  }
+  if (UNLIKELY(r.val < 0)) {
+    SetStatusInvalidValue(TErrorCode::AVRO_INVALID_LENGTH, r.val);
+    return ReadWriteUtil::ZLongResult::error();
+  }
+  if (UNLIKELY(data_end - *data < r.val)) {
+    SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK);
+    return ReadWriteUtil::ZLongResult::error();
+  }
+  return r;
+}
+
+bool HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, 
uint8_t** data,
+    uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end);
+  if (UNLIKELY(!len.ok)) return false;
   if (write_slot) {
     DCHECK(type == TYPE_VARCHAR);
     StringValue* sv = reinterpret_cast<StringValue*>(slot);
-    int str_len = std::min(static_cast<int>(len), max_len);
+    int str_len = std::min(static_cast<int>(len.val), max_len);
     DCHECK(str_len >= 0);
     sv->len = str_len;
     sv->ptr = reinterpret_cast<char*>(*data);
   }
-  *data += len;
+  *data += len.val;
+  return true;
 }
 
 bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** 
data,
-    bool write_slot, void* slot, MemPool* pool) {
-  int64_t len = ReadWriteUtil::ReadZLong(data);
+    uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end);
+  if (UNLIKELY(!len.ok)) return false;
   if (write_slot) {
     DCHECK(type == TYPE_CHAR);
     ColumnType ctype = ColumnType::CreateCharType(max_len);
-    int str_len = std::min(static_cast<int>(len), max_len);
+    int str_len = std::min(static_cast<int>(len.val), max_len);
     if (ctype.IsVarLenStringType()) {
       StringValue* sv = reinterpret_cast<StringValue*>(slot);
       sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len));
@@ -158,35 +217,42 @@ bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, 
int max_len, uint8_t** da
       StringValue::PadWithSpaces(reinterpret_cast<char*>(slot), max_len, 
str_len);
     }
   }
-  *data += len;
+  *data += len.val;
   return true;
 }
 
-void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
-    bool write_slot, void* slot, MemPool* pool) {
-  int64_t len = ReadWriteUtil::ReadZLong(data);
+bool HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data,
+    uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end);
+  if (UNLIKELY(!len.ok)) return false;
   if (write_slot) {
     DCHECK(type == TYPE_STRING);
     StringValue* sv = reinterpret_cast<StringValue*>(slot);
-    sv->len = len;
+    sv->len = len.val;
     sv->ptr = reinterpret_cast<char*>(*data);
   }
-  *data += len;
+  *data += len.val;
+  return true;
 }
 
-void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
-    bool write_slot, void* slot, MemPool* pool) {
-  int64_t len = ReadWriteUtil::ReadZLong(data);
+bool HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
+    uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) {
+  ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end);
+  if (UNLIKELY(!len.ok)) return false;
   if (write_slot) {
+    DCHECK_GE(len.val, 0);
+    if (UNLIKELY(len.val > slot_byte_size)) {
+      SetStatusInvalidValue(TErrorCode::AVRO_INVALID_LENGTH, len.val);
+      return false;
+    }
     // Decimals are encoded as big-endian integers. Copy the decimal into the 
most
     // significant bytes and then shift down to the correct position to 
sign-extend the
     // decimal.
-    DCHECK_LE(len, slot_byte_size);
-    int bytes_to_fill = slot_byte_size - len;
+    int bytes_to_fill = slot_byte_size - len.val;
 #if __BYTE_ORDER == __LITTLE_ENDIAN
-    BitUtil::ByteSwap(reinterpret_cast<uint8_t*>(slot) + bytes_to_fill, *data, 
len);
+    BitUtil::ByteSwap(reinterpret_cast<uint8_t*>(slot) + bytes_to_fill, *data, 
len.val);
 #else
-    memcpy(slot, *data, len);
+    memcpy(slot, *data, len.val);
 #endif
     switch (slot_byte_size) {
       case 4: {
@@ -208,5 +274,6 @@ void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, 
uint8_t** data,
         DCHECK(false) << "Decimal slots can't be this size: " << 
slot_byte_size;
     }
   }
-  *data += len;
+  *data += len.val;
+  return true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-test.cc 
b/be/src/exec/hdfs-avro-scanner-test.cc
new file mode 100644
index 0000000..e5f77f3
--- /dev/null
+++ b/be/src/exec/hdfs-avro-scanner-test.cc
@@ -0,0 +1,401 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "exec/hdfs-avro-scanner.h"
+
+#include <gtest/gtest.h>
+#include <limits.h>
+
+#include "common/init.h"
+#include "exec/read-write-util.h"
+#include "runtime/decimal-value.inline.h"
+#include "runtime/runtime-state.h"
+#include "runtime/string-value.inline.h"
+
+// TODO: CHAR, VARCHAR (IMPALA-3658)
+
+namespace impala {
+
+class HdfsAvroScannerTest : public testing::Test {
+ public:
+  void TestReadUnionType(int null_union_position, uint8_t* data, int64_t 
data_len,
+      bool expected_is_null, TErrorCode::type expected_error = TErrorCode::OK) 
{
+    // Reset parse_status_
+    scanner_.parse_status_ = Status::OK();
+
+    uint8_t* new_data = data;
+    bool is_null = -1;
+    bool expect_success = expected_error == TErrorCode::OK;
+
+    bool success = scanner_.ReadUnionType(null_union_position, &new_data, data 
+ data_len,
+        &is_null);
+    EXPECT_EQ(success, expect_success);
+
+    if (success) {
+      EXPECT_TRUE(scanner_.parse_status_.ok());
+      EXPECT_EQ(is_null, expected_is_null);
+      EXPECT_EQ(new_data, data + 1);
+    } else {
+      EXPECT_EQ(scanner_.parse_status_.code(), expected_error);
+    }
+  }
+
+  // Templated function for calling different ReadAvro* functions.
+  //
+  // PrimitiveType is a template parameter so we can pass in int 
'slot_byte_size' to
+  // ReadAvroDecimal, but otherwise this argument is always the PrimitiveType 
'type'
+  // argument.
+  template<typename T, typename ReadAvroTypeFn, typename PrimitiveType>
+  void TestReadAvroType(ReadAvroTypeFn read_fn, PrimitiveType type, uint8_t* 
data,
+      int64_t data_len, T expected_val, int expected_encoded_len,
+      TErrorCode::type expected_error = TErrorCode::OK) {
+    // Reset parse_status_
+    scanner_.parse_status_ = Status::OK();
+
+    uint8_t* new_data = data;
+    T slot;
+    bool expect_success = expected_error == TErrorCode::OK;
+
+    bool success = (scanner_.*read_fn)(type, &new_data, data + data_len, true, 
&slot,
+        NULL);
+    EXPECT_EQ(success, expect_success);
+
+    if (success) {
+      EXPECT_TRUE(scanner_.parse_status_.ok());
+      EXPECT_EQ(slot, expected_val);
+      EXPECT_EQ(new_data, data + expected_encoded_len);
+    } else {
+      EXPECT_EQ(scanner_.parse_status_.code(), expected_error);
+    }
+  }
+
+  void TestReadAvroBoolean(uint8_t* data, int64_t data_len, bool expected_val,
+      TErrorCode::type expected_error = TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroBoolean, TYPE_BOOLEAN, data, 
data_len,
+        expected_val, 1, expected_error);
+  }
+
+  void TestReadAvroInt32(uint8_t* data, int64_t data_len, int32_t expected_val,
+      int expected_encoded_len, TErrorCode::type expected_error = 
TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_INT, data, data_len,
+        expected_val, expected_encoded_len, expected_error);
+    // Test type promotion to long, float, and double
+    int64_t expected_bigint = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_BIGINT, data, 
data_len,
+        expected_bigint, expected_encoded_len, expected_error);
+    float expected_float = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_FLOAT, data, 
data_len,
+        expected_float, expected_encoded_len, expected_error);
+    double expected_double = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_DOUBLE, data, 
data_len,
+        expected_double, expected_encoded_len, expected_error);
+  }
+
+  void TestReadAvroInt64(uint8_t* data, int64_t data_len, int64_t expected_val,
+      int expected_encoded_len, TErrorCode::type expected_error = 
TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_BIGINT, data, 
data_len,
+        expected_val, expected_encoded_len, expected_error);
+    // Test type promotion to float and double
+    float expected_float = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_FLOAT, data, 
data_len,
+        expected_float, expected_encoded_len, expected_error);
+    double expected_double = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_DOUBLE, data, 
data_len,
+        expected_double, expected_encoded_len, expected_error);
+  }
+
+  void TestReadAvroFloat(uint8_t* data, int64_t data_len, float expected_val,
+      TErrorCode::type expected_error = TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroFloat, TYPE_FLOAT, data, 
data_len,
+        expected_val, 4, expected_error);
+    // Test type promotion to double
+    double expected_double = expected_val;
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroFloat, TYPE_DOUBLE, data, 
data_len,
+        expected_double, 4, expected_error);
+  }
+
+  void TestReadAvroDouble(uint8_t* data, int64_t data_len, double expected_val,
+      TErrorCode::type expected_error = TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroDouble, TYPE_DOUBLE, data, 
data_len,
+        expected_val, 8, expected_error);
+  }
+
+  void TestReadAvroString(uint8_t* data, int64_t data_len, StringValue 
expected_val,
+      int expected_encoded_len, TErrorCode::type expected_error = 
TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroString, TYPE_STRING, data, 
data_len,
+        expected_val, expected_encoded_len, expected_error);
+  }
+
+  template<typename T>
+  void TestReadAvroDecimal(uint8_t* data, int64_t data_len, DecimalValue<T> 
expected_val,
+      int expected_encoded_len, TErrorCode::type expected_error = 
TErrorCode::OK) {
+    TestReadAvroType(&HdfsAvroScanner::ReadAvroDecimal, sizeof(expected_val), 
data,
+        data_len, expected_val, expected_encoded_len, expected_error);
+  }
+
+  void TestInt64Val(int64_t val) {
+    uint8_t data[100];
+    int len = ReadWriteUtil::PutZLong(val, data);
+    DCHECK_GT(len, 0);
+    TestReadAvroInt64(data, len, val, len);
+    TestReadAvroInt64(data, len + 1, val, len);
+    TestReadAvroInt64(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+  }
+
+ protected:
+  HdfsAvroScanner scanner_;
+};
+
+// Tests reading a [<some type>, "null"] union.
+TEST_F(HdfsAvroScannerTest, NullUnionTest) {
+  uint8_t data[100];
+  data[0] = 0;
+  TestReadUnionType(0, data, 1, true);
+  TestReadUnionType(1, data, 1, false);
+  TestReadUnionType(0, data, 10, true);
+  TestReadUnionType(1, data, 10, false);
+  TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 2;
+  TestReadUnionType(0, data, 1, false);
+  TestReadUnionType(1, data, 1, true);
+  TestReadUnionType(0, data, 10, false);
+  TestReadUnionType(1, data, 10, true);
+  TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 1;
+  TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadUnionType(0, data, 1, false, TErrorCode::AVRO_INVALID_UNION);
+
+  data[0] = -1;
+  TestReadUnionType(0, data, 1, false, TErrorCode::AVRO_INVALID_UNION);
+}
+
+TEST_F(HdfsAvroScannerTest, BooleanTest) {
+  uint8_t data[100];
+  data[0] = 0;
+  TestReadAvroBoolean(data, 1, false);
+  TestReadAvroBoolean(data, 10, false);
+  TestReadAvroBoolean(data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 1;
+  TestReadAvroBoolean(data, 1, true);
+  TestReadAvroBoolean(data, 10, true);
+  TestReadAvroBoolean(data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 2;
+  TestReadAvroBoolean(data, 1, false, TErrorCode::AVRO_INVALID_BOOLEAN);
+}
+
+TEST_F(HdfsAvroScannerTest, Int32Test) {
+  uint8_t data[100];
+  data[0] = 1; // decodes to -1
+  TestReadAvroInt32(data, 1, -1, 1);
+  TestReadAvroInt32(data, 10, -1, 1);
+  TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  data[0] = 2; // decodes to 1
+  TestReadAvroInt32(data, 1, 1, 1);
+  TestReadAvroInt32(data, 10, 1, 1);
+  TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  data[0] = 0x80; // decodes to 64
+  data[1] = 0x01;
+  TestReadAvroInt32(data, 2, 64, 2);
+  TestReadAvroInt32(data, 10, 64, 2);
+  TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroInt32(data, 1, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  int len = ReadWriteUtil::PutZInt(INT_MAX, data);
+  TestReadAvroInt32(data, len, INT_MAX, len);
+  TestReadAvroInt32(data, len + 1, INT_MAX, len);
+  TestReadAvroInt32(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  len = ReadWriteUtil::PutZInt(INT_MIN, data);
+  TestReadAvroInt32(data, len, INT_MIN, len);
+  TestReadAvroInt32(data, len + 1, INT_MIN, len);
+  TestReadAvroInt32(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  // TODO: we don't handle invalid values (e.g. overflow) (IMPALA-3659)
+}
+
+TEST_F(HdfsAvroScannerTest, Int64Test) {
+  uint8_t data[100];
+  data[0] = 1; // decodes to -1
+  TestReadAvroInt64(data, 1, -1, 1);
+  TestReadAvroInt64(data, 10, -1, 1);
+  TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  data[0] = 2; // decodes to 1
+  TestReadAvroInt64(data, 1, 1, 1);
+  TestReadAvroInt64(data, 10, 1, 1);
+  TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  data[0] = 0x80; // decodes to 64
+  data[1] = 0x01;
+  TestReadAvroInt64(data, 2, 64, 2);
+  TestReadAvroInt64(data, 10, 64, 2);
+  TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroInt64(data, 1, -1, -1, TErrorCode::SCANNER_INVALID_INT);
+
+  TestInt64Val(INT_MAX);
+  TestInt64Val(INT_MIN);
+  TestInt64Val(LLONG_MAX);
+  TestInt64Val(LLONG_MIN);
+
+  // TODO: we don't handle invalid values (e.g. overflow) (IMPALA-3659)
+}
+
+TEST_F(HdfsAvroScannerTest, FloatTest) {
+  uint8_t data[100];
+  float f = 1.23456789;
+  memcpy(data, &f, sizeof(float));
+  TestReadAvroFloat(data, 4, f);
+  TestReadAvroFloat(data, 10, f);
+  TestReadAvroFloat(data, 0, f, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroFloat(data, 1, f, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroFloat(data, 2, f, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroFloat(data, 3, f, TErrorCode::AVRO_TRUNCATED_BLOCK);
+}
+
+TEST_F(HdfsAvroScannerTest, DoubleTest) {
+  uint8_t data[100];
+  double d = 1.23456789012345;
+  memcpy(data, &d, sizeof(double));
+  TestReadAvroDouble(data, 8, d);
+  TestReadAvroDouble(data, 10, d);
+  TestReadAvroDouble(data, 0, d, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDouble(data, 1, d, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDouble(data, 2, d, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDouble(data, 3, d, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDouble(data, 7, d, TErrorCode::AVRO_TRUNCATED_BLOCK);
+}
+
+TEST_F(HdfsAvroScannerTest, StringTest) {
+  uint8_t data[100];
+  const char* s = "hello";
+  DCHECK_EQ(strlen(s), 5);
+  data[0] = 10; // decodes to 5
+  memcpy(&data[1], s, 5);
+  StringValue sv(s);
+  TestReadAvroString(data, 6, sv, 6);
+  TestReadAvroString(data, 10, sv, 6);
+  TestReadAvroString(data, 0, sv, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroString(data, 1, sv, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroString(data, 5, sv, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 1; // decodes to -1
+  TestReadAvroString(data, 10, sv, -1, TErrorCode::AVRO_INVALID_LENGTH);
+
+  data[0] = 0; // decodes to 0
+  sv.len = 0;
+  TestReadAvroString(data, 1, sv, 1);
+  TestReadAvroString(data, 10, sv, 1);
+}
+
+TEST_F(HdfsAvroScannerTest, DecimalTest) {
+  uint8_t data[100];
+  Decimal4Value d4v(123);
+  // Unscaled value (123) can be stored in 1 byte
+  data[0] = 2; // decodes to 1
+  data[1] = 123;
+  TestReadAvroDecimal(data, 2, d4v, 2);
+  TestReadAvroDecimal(data, 10, d4v, 2);
+  TestReadAvroDecimal(data, 0, d4v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  data[0] = 10; // decodes to 5
+  TestReadAvroDecimal(data, 10, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH);
+
+  data[0] = 1; // decodes to -1
+  TestReadAvroDecimal(data, 10, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH);
+
+  data[0] = 0; // decodes to 0
+  TestReadAvroDecimal(data, 10, Decimal4Value(0), 1);
+
+  data[0] = 0x80; // decodes to 64
+  data[1] = 0x01;
+  TestReadAvroDecimal(data, 100, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH);
+
+  d4v = Decimal4Value(123456789);
+  // Unscaled value can be stored in 4 bytes
+  data[0] = 8; // decodes to 4
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  BitUtil::ByteSwap(&data[1], &d4v.value(), 4);
+#else
+  memcpy(&data[1], &d4v.value(), 4);
+#endif
+  TestReadAvroDecimal(data, 5, d4v, 5);
+  TestReadAvroDecimal(data, 10, d4v, 5);
+  TestReadAvroDecimal(data, 0, d4v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDecimal(data, 4, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  Decimal8Value d8v(1);
+  data[0] = 2; // decodes to 1
+  data[1] = 1;
+  TestReadAvroDecimal(data, 2, d8v, 2);
+  TestReadAvroDecimal(data, 10, d8v, 2);
+  TestReadAvroDecimal(data, 0, d8v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  d8v = Decimal8Value(123456789012345678);
+  data[0] = 16; // decodes to 8
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  BitUtil::ByteSwap(&data[1], &d8v.value(), 8);
+#else
+  memcpy(&data[1], &d8v.value(), 8);
+#endif
+  TestReadAvroDecimal(data, 9, d8v, 9);
+  TestReadAvroDecimal(data, 10, d8v, 9);
+  TestReadAvroDecimal(data, 0, d8v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDecimal(data, 7, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  Decimal16Value d16v(1234567890);
+  data[0] = 10; // decodes to 5
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  BitUtil::ByteSwap(&data[1], &d16v.value(), 5);
+#else
+  memcpy(&data[1], &d16v.value(), 5);
+#endif
+  TestReadAvroDecimal(data, 6, d16v, 6);
+  TestReadAvroDecimal(data, 10, d16v, 6);
+  TestReadAvroDecimal(data, 0, d16v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDecimal(data, 4, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+
+  bool overflow;
+  d16v = Decimal16Value::FromDouble(38, 0, .1e38d, &overflow);
+  DCHECK(!overflow);
+  data[0] = 32; // decodes to 16
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  BitUtil::ByteSwap(&data[1], &d16v.value(), 16);
+#else
+  memcpy(&data[1], &d16v.value(), 16);
+#endif
+  TestReadAvroDecimal(data, 17, d16v, 17);
+  TestReadAvroDecimal(data, 20, d16v, 17);
+  TestReadAvroDecimal(data, 0, d16v, -1, TErrorCode::SCANNER_INVALID_INT);
+  TestReadAvroDecimal(data, 1, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+  TestReadAvroDecimal(data, 16, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK);
+}
+
+}
+
+int main(int argc, char **argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index ec8ec26..70dff83 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -27,6 +27,7 @@
 #include "util/codec.h"
 #include "util/decompress.h"
 #include "util/runtime-profile-counters.h"
+#include "util/test-info.h"
 
 #include "common/names.h"
 
@@ -57,6 +58,13 @@ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, 
RuntimeState* state)
     codegend_decode_avro_data_(NULL) {
 }
 
+HdfsAvroScanner::HdfsAvroScanner()
+  : BaseSequenceScanner(),
+    avro_header_(NULL),
+    codegend_decode_avro_data_(NULL) {
+  DCHECK(TestInfo::is_test());
+}
+
 Status HdfsAvroScanner::Prepare(ScannerContext* context) {
   RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
   if (scan_node_->avro_schema().schema == NULL) {
@@ -112,7 +120,10 @@ Status HdfsAvroScanner::ParseMetadata() {
 
   int64_t num_entries;
   RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
-  if (num_entries < 1) return Status("File header metadata has no data");
+  if (num_entries < 1) {
+    return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(),
+        num_entries, stream_->file_offset());
+  }
 
   while (num_entries != 0) {
     DCHECK_GT(num_entries, 0);
@@ -122,7 +133,10 @@ Status HdfsAvroScanner::ParseMetadata() {
       uint8_t* key_buf;
       int64_t key_len;
       RETURN_IF_FALSE(stream_->ReadZLong(&key_len, &parse_status_));
-      DCHECK_GE(key_len, 0);
+      if (key_len < 0) {
+        return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), 
key_len,
+            stream_->file_offset());
+      }
       RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_));
       key = string(reinterpret_cast<char*>(key_buf), key_len);
 
@@ -130,7 +144,10 @@ Status HdfsAvroScanner::ParseMetadata() {
       uint8_t* value;
       int64_t value_len;
       RETURN_IF_FALSE(stream_->ReadZLong(&value_len, &parse_status_));
-      DCHECK_GE(value_len, 0);
+      if (value_len < 0) {
+        return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), 
value_len,
+            stream_->file_offset());
+      }
       RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_));
 
       if (key == AVRO_SCHEMA_KEY) {
@@ -173,6 +190,10 @@ Status HdfsAvroScanner::ParseMetadata() {
       }
     }
     RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
+    if (num_entries < 0) {
+      return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, 
stream_->filename(),
+          num_entries, stream_->file_offset());
+    }
   }
 
   VLOG_FILE << stream_->filename() << ": "
@@ -461,13 +482,22 @@ Status HdfsAvroScanner::ProcessRange() {
     uint8_t* compressed_data;
     int64_t compressed_size;
     uint8_t* data;
+    int64_t data_len;
+    uint8_t* data_end;
 
     // Read new data block
     RETURN_IF_FALSE(
         stream_->ReadZLong(&num_records, &parse_status_));
+    if (num_records < 0) {
+      return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
+          num_records, stream_->file_offset());
+    }
     DCHECK_GE(num_records, 0);
     RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_));
-    DCHECK_GE(compressed_size, 0);
+    if (compressed_size < 0) {
+      return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, 
stream_->filename(),
+          compressed_size, stream_->file_offset());
+    }
     RETURN_IF_FALSE(stream_->ReadBytes(
         compressed_size, &compressed_data, &parse_status_));
 
@@ -477,14 +507,15 @@ Status HdfsAvroScanner::ProcessRange() {
         // decompressor_ doesn't expect this
         compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN;
       }
-      int64_t size;
       SCOPED_TIMER(decompress_timer_);
       RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, 
compressed_data,
-                                                  &size, &data));
-      VLOG_FILE << "Decompressed " << compressed_size << " to " << size;
+          &data_len, &data));
+      VLOG_FILE << "Decompressed " << compressed_size << " to " << data_len;
     } else {
       data = compressed_data;
+      data_len = compressed_size;
     }
+    data_end = data + data_len;
 
     // Process block data
     while (num_records > 0) {
@@ -501,10 +532,11 @@ Status HdfsAvroScanner::ProcessRange() {
         num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples);
       } else {
         if (codegend_decode_avro_data_ != NULL) {
-          num_to_commit = codegend_decode_avro_data_(
-              this, max_tuples, pool, &data, tuple, tuple_row);
+          num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, 
&data,
+              data_end, tuple, tuple_row);
         } else {
-          num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, 
tuple_row);
+          num_to_commit = DecodeAvroData(max_tuples, pool, &data, data_end, 
tuple,
+              tuple_row);
         }
       }
       RETURN_IF_ERROR(parse_status_);
@@ -525,9 +557,11 @@ Status HdfsAvroScanner::ProcessRange() {
 }
 
 bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
-    MemPool* pool, uint8_t** data, Tuple* tuple) {
+    MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple) {
   DCHECK_EQ(record_schema.schema->type, AVRO_RECORD);
   for (const AvroSchemaElement& element: record_schema.children) {
+    DCHECK_LE(*data, data_end);
+
     const SlotDescriptor* slot_desc = element.slot_desc;
     bool write_slot = false;
     void* slot = NULL;
@@ -539,41 +573,45 @@ bool HdfsAvroScanner::MaterializeTuple(const 
AvroSchemaElement& record_schema,
     }
 
     avro_type_t type = element.schema->type;
-    if (element.nullable() && !ReadUnionType(element.null_union_position, 
data)) {
-      type = AVRO_NULL;
+    if (element.nullable()) {
+      bool is_null;
+      if (!ReadUnionType(element.null_union_position, data, data_end, 
&is_null)) {
+        return false;
+      }
+      if (is_null) type = AVRO_NULL;
     }
 
+    bool success;
     switch (type) {
       case AVRO_NULL:
         if (slot_desc != NULL) 
tuple->SetNull(slot_desc->null_indicator_offset());
+        success = true;
         break;
       case AVRO_BOOLEAN:
-        ReadAvroBoolean(slot_type, data, write_slot, slot, pool);
+        success = ReadAvroBoolean(slot_type, data, data_end, write_slot, slot, 
pool);
         break;
       case AVRO_INT32:
-        ReadAvroInt32(slot_type, data, write_slot, slot, pool);
+        success = ReadAvroInt32(slot_type, data, data_end, write_slot, slot, 
pool);
         break;
       case AVRO_INT64:
-        ReadAvroInt64(slot_type, data, write_slot, slot, pool);
+        success = ReadAvroInt64(slot_type, data, data_end, write_slot, slot, 
pool);
         break;
       case AVRO_FLOAT:
-        ReadAvroFloat(slot_type, data, write_slot, slot, pool);
+        success = ReadAvroFloat(slot_type, data, data_end, write_slot, slot, 
pool);
         break;
       case AVRO_DOUBLE:
-        ReadAvroDouble(slot_type, data, write_slot, slot, pool);
+        success = ReadAvroDouble(slot_type, data, data_end, write_slot, slot, 
pool);
         break;
       case AVRO_STRING:
       case AVRO_BYTES:
         if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
-          ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, 
slot, pool);
+          success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, 
data_end,
+              write_slot, slot, pool);
         } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
-          if (UNLIKELY(!ReadAvroChar(slot_type, slot_desc->type().len, data, 
write_slot,
-                           slot, pool))) {
-            DCHECK(!parse_status_.ok());
-            return false;
-          }
+          success = ReadAvroChar(slot_type, slot_desc->type().len, data, 
data_end,
+              write_slot, slot, pool);
         } else {
-          ReadAvroString(slot_type, data, write_slot, slot, pool);
+          success = ReadAvroString(slot_type, data, data_end, write_slot, 
slot, pool);
         }
         break;
       case AVRO_DECIMAL: {
@@ -582,19 +620,41 @@ bool HdfsAvroScanner::MaterializeTuple(const 
AvroSchemaElement& record_schema,
           DCHECK_EQ(slot_type, TYPE_DECIMAL);
           slot_byte_size = slot_desc->type().GetByteSize();
         }
-        ReadAvroDecimal(slot_byte_size, data, write_slot, slot, pool);
+        success = ReadAvroDecimal(slot_byte_size, data, data_end, write_slot, 
slot, pool);
         break;
       }
       case AVRO_RECORD:
-        MaterializeTuple(element, pool, data, tuple);
+        success = MaterializeTuple(element, pool, data, data_end, tuple);
         break;
       default:
         DCHECK(false) << "Unsupported SchemaElement: " << type;
     }
+    if (UNLIKELY(!success)) {
+      DCHECK(!parse_status_.ok());
+      return false;
+    }
   }
   return true;
 }
 
+void HdfsAvroScanner::SetStatusCorruptData(TErrorCode::type error_code) {
+  DCHECK(parse_status_.ok());
+  if (TestInfo::is_test()) {
+    parse_status_ = Status(error_code, "test file", 123);
+  } else {
+    parse_status_ = Status(error_code, stream_->filename(), 
stream_->file_offset());
+  }
+}
+
+void HdfsAvroScanner::SetStatusInvalidValue(TErrorCode::type error_code, 
int64_t len) {
+  DCHECK(parse_status_.ok());
+  if (TestInfo::is_test()) {
+    parse_status_ = Status(error_code, "test file", len, 123);
+  } else {
+    parse_status_ = Status(error_code, stream_->filename(), len, 
stream_->file_offset());
+  }
+}
+
 // This function produces a codegen'd function equivalent to 
MaterializeTuple() but
 // optimized for the table schema. Via helper functions CodegenReadRecord() and
 // CodegenReadScalar(), it eliminates the conditionals necessary when 
interpreting the
@@ -602,76 +662,95 @@ bool HdfsAvroScanner::MaterializeTuple(const 
AvroSchemaElement& record_schema,
 // the schema. Example output with tpch.region:
 //
 // define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
-//     %"struct.impala::AvroSchemaElement"* %record_schema,
-//     %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* 
%tuple) {
+//   %"struct.impala::AvroSchemaElement"* %record_schema, 
%"class.impala::MemPool"* %pool,
+//   i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #33 {
 // entry:
-//   %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32,
-//       %"struct.impala::StringValue", %"struct.impala::StringValue" }*
-//   %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
-//       %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
-//   br i1 %is_not_null, label %read_field, label %null_field
+//   %is_null_ptr = alloca i1
+//   %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, [3 x i8], 
i32,
+//     %"struct.impala::StringValue", %"struct.impala::StringValue" }*
+//   %0 = bitcast i1* %is_null_ptr to i8*
+//   %read_union_ok = call i1 
@_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* 
%data_end, i8* %0)
+//   br i1 %read_union_ok, label %read_union_ok1, label %bail_out
 //
-// read_field:                                       ; preds = %entry
-//   %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
-//       %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 1
+// read_union_ok1:                                   ; preds = %entry
+//   %is_null = load i1, i1* %is_null_ptr
+//   br i1 %is_null, label %null_field, label %read_field
+//
+// read_field:                                       ; preds = %read_union_ok1
+//   %slot = getelementptr inbounds { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }, { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2
 //   %opaque_slot = bitcast i32* %slot to i8*
-//   call void
-//    
@_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
-//        %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i1 true,
-//        i8* %opaque_slot, %"class.impala::MemPool"* %pool)
-//   br label %end_field
+//   %success = call i1
+//   
@_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i8* 
%data_end,
+//     i1 true, i8* %opaque_slot, %"class.impala::MemPool"* %pool)
+//   br i1 %success, label %end_field, label %bail_out
 //
-// null_field:                                       ; preds = %entry
-//   call void @SetNull({ i8, i32, %"struct.impala::StringValue",
-//       %"struct.impala::StringValue" }* %tuple_ptr)
+// null_field:                                       ; preds = %read_union_ok1
+//   call void @SetNull({ i8, [3 x i8], i32, %"struct.impala::StringValue",
+//     %"struct.impala::StringValue" }* %tuple_ptr)
 //   br label %end_field
 //
 // end_field:                                        ; preds = %read_field, 
%null_field
-//  %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
-//      %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
-//  br i1 %is_not_null4, label %read_field1, label %null_field3
+//   %1 = bitcast i1* %is_null_ptr to i8*
+//   %read_union_ok4 = call i1 
@_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* 
%data_end, i8* %1)
+//   br i1 %read_union_ok4, label %read_union_ok5, label %bail_out
+//
+// read_union_ok5:                                   ; preds = %end_field
+//   %is_null7 = load i1, i1* %is_null_ptr
+//   br i1 %is_null7, label %null_field6, label %read_field2
+//
+// read_field2:                                      ; preds = %read_union_ok5
+//   %slot8 = getelementptr inbounds { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }, { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3
+//   %opaque_slot9 = bitcast %"struct.impala::StringValue"* %slot8 to i8*
+//   %success10 = call i1
+//  
@_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* 
%data_end,
+//     i1 true, i8* %opaque_slot9, %"class.impala::MemPool"* %pool)
+//   br i1 %success10, label %end_field3, label %bail_out
 //
-// read_field1:                                      ; preds = %end_field
-//  %slot5 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
-//      %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2
-//  %opaque_slot6 = bitcast %"struct.impala::StringValue"* %slot5 to i8*
-//  call void
-//   
@_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
-//       %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
-//       i8* %opaque_slot6, %"class.impala::MemPool"* %pool)
-//  br label %end_field2
+// null_field6:                                      ; preds = %read_union_ok5
+//   call void @SetNull.1({ i8, [3 x i8], i32, %"struct.impala::StringValue",
+//     %"struct.impala::StringValue" }* %tuple_ptr)
+//   br label %end_field3
 //
-// null_field3:                                      ; preds = %end_field
-//   call void @SetNull1({ i8, i32, %"struct.impala::StringValue",
-//       %"struct.impala::StringValue" }* %tuple_ptr)
-//   br label %end_field2
+// end_field3:                                       ; preds = %read_field2, 
%null_field6
+//   %2 = bitcast i1* %is_null_ptr to i8*
+//   %read_union_ok13 = call i1 
@_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* 
%data_end, i8* %2)
+//   br i1 %read_union_ok13, label %read_union_ok14, label %bail_out
 //
-// end_field2:                                       ; preds = %read_field1, 
%null_field3
-//   %is_not_null10 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
-//       %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
-//   br i1 %is_not_null10, label %read_field7, label %null_field9
+// read_union_ok14:                                  ; preds = %end_field3
+//   %is_null16 = load i1, i1* %is_null_ptr
+//   br i1 %is_null16, label %null_field15, label %read_field11
 //
-// read_field7:                                      ; preds = %end_field2
-//   %slot11 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue",
-//       %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3
-//   %opaque_slot12 = bitcast %"struct.impala::StringValue"* %slot11 to i8*
-//   call void
-//    
@_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE(
-//        %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true,
-//        i8* %opaque_slot12, %"class.impala::MemPool"* %pool)
-//   br label %end_field8
+// read_field11:                                     ; preds = %read_union_ok14
+//   %slot17 = getelementptr inbounds { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }, { i8, [3 x i8], i32, 
%"struct.impala::StringValue",
+//    %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 4
+//   %opaque_slot18 = bitcast %"struct.impala::StringValue"* %slot17 to i8*
+//   %success19 = call i1
+//  
@_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
+//     %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* 
%data_end,
+//     i1 true, i8* %opaque_slot18, %"class.impala::MemPool"* %pool)
+//   br i1 %success19, label %end_field12, label %bail_out
 //
-// null_field9:                                      ; preds = %end_field2
-//   call void @SetNull2({ i8, i32, %"struct.impala::StringValue",
-//       %"struct.impala::StringValue" }* %tuple_ptr)
-//   br label %end_field8
+// null_field15:                                     ; preds = %read_union_ok14
+//   call void @SetNull.2({ i8, [3 x i8], i32, %"struct.impala::StringValue",
+//     %"struct.impala::StringValue" }* %tuple_ptr)
+//   br label %end_field12
 //
-// end_field8:                                       ; preds = %read_field7, 
%null_field9
+// end_field12:                                    ; preds = %read_field11, 
%null_field15
 //   ret i1 true
 //
-// bail_out:                                         ; No predecessors!
-//   ret i1 false                                    // used only if there is 
CHAR.
-//}
+// bail_out:           ; preds = %read_field11, %end_field3, %read_field2, 
%end_field,
+//   ret i1 false      ;         %read_field, %entry
+// }
 Function* HdfsAvroScanner::CodegenMaterializeTuple(
     HdfsScanNode* node, LlvmCodeGen* codegen) {
   LLVMContext& context = codegen->context();
@@ -698,15 +777,17 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
   prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", 
schema_element_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("data_end", 
codegen->ptr_type()));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", 
tuple_opaque_ptr_type));
-  Value* args[5];
+  Value* args[6];
   Function* fn = prototype.GeneratePrototype(&builder, args);
 
   Value* this_val = args[0];
   // Value* record_schema_val = args[1]; // don't need this
   Value* pool_val = args[2];
   Value* data_val = args[3];
-  Value* opaque_tuple_val = args[4];
+  Value* data_end_val = args[4];
+  Value* opaque_tuple_val = args[5];
 
   Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, 
"tuple_ptr");
 
@@ -715,7 +796,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
 
   Status status = CodegenReadRecord(
       SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, 
bail_out_block,
-      bail_out_block, this_val, pool_val, tuple_val, data_val);
+      bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val);
   if (!status.ok()) {
     VLOG_QUERY << status.GetDetail();
     fn->eraseFromParent();
@@ -736,7 +817,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
     const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* 
node,
     LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* 
insert_before,
     BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
-    Value* data_val) {
+    Value* data_val, Value* data_end_val) {
   if (record.schema == NULL) {
     return Status("Missing Avro schema in scan node. This could be due to 
stale "
         "metadata. Running 'invalidate metadata <tablename>' may resolve the 
problem.");
@@ -748,6 +829,9 @@ Status HdfsAvroScanner::CodegenReadRecord(
 
   // Codegen logic for parsing each field and, if necessary, populating a slot 
with the
   // result.
+
+  // Used to store result of ReadUnionType() call
+  Value* is_null_ptr = NULL;
   for (int i = 0; i < record.children.size(); ++i) {
     const AvroSchemaElement* field = &record.children[i];
     int col_idx = i;
@@ -775,14 +859,25 @@ Status HdfsAvroScanner::CodegenReadRecord(
 
     if (field->nullable()) {
       // Field could be null. Create conditional branch based on ReadUnionType 
result.
-      null_block = BasicBlock::Create(context, "null_field", fn, 
end_field_block);
-      Function* read_union_fn =
-          codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
+      Function* read_union_fn = 
codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
       Value* null_union_pos_val =
           codegen->GetIntConstant(TYPE_INT, field->null_union_position);
-      Value* is_not_null_val = builder->CreateCall(read_union_fn,
-          ArrayRef<Value*>({this_val, null_union_pos_val, data_val}), 
"is_not_null");
-      builder->CreateCondBr(is_not_null_val, read_field_block, null_block);
+      if (is_null_ptr == NULL) {
+        is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, 
codegen->boolean_type(),
+            "is_null_ptr");
+      }
+      Value* is_null_ptr_cast = builder->CreateBitCast(is_null_ptr, 
codegen->ptr_type());
+      Value* read_union_ok = builder->CreateCall(read_union_fn,
+          ArrayRef<Value*>({this_val, null_union_pos_val, data_val, 
data_end_val,
+          is_null_ptr_cast}), "read_union_ok");
+      BasicBlock* read_union_ok_block = BasicBlock::Create(context, 
"read_union_ok", fn,
+          read_field_block);
+      builder->CreateCondBr(read_union_ok, read_union_ok_block, bail_out);
+
+      builder->SetInsertPoint(read_union_ok_block);
+      null_block = BasicBlock::Create(context, "null_field", fn, 
end_field_block);
+      Value* is_null = builder->CreateLoad(is_null_ptr, "is_null");
+      builder->CreateCondBr(is_null, null_block, read_field_block);
 
       // Write null field IR
       builder->SetInsertPoint(null_block);
@@ -800,16 +895,18 @@ Status HdfsAvroScanner::CodegenReadRecord(
 
     // Write read_field_block IR
     builder->SetInsertPoint(read_field_block);
+    Value* ret_val;
     if (field->schema->type == AVRO_RECORD) {
       BasicBlock* insert_before_block =
           (null_block != NULL) ? null_block : end_field_block;
       RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, 
builder, fn,
-          insert_before_block, bail_out, this_val, pool_val, tuple_val, 
data_val));
+          insert_before_block, bail_out, this_val, pool_val, tuple_val, 
data_val,
+          data_end_val));
     } else {
       RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
-          end_field_block, bail_out, this_val, pool_val, tuple_val, data_val));
+          this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val));
     }
-    builder->CreateBr(end_field_block);
+    builder->CreateCondBr(ret_val, end_field_block, bail_out);
 
     // Set insertion point for next field.
     builder->SetInsertPoint(end_field_block);
@@ -819,8 +916,8 @@ Status HdfsAvroScanner::CodegenReadRecord(
 
 Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
     SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
-    BasicBlock* end_field_block, BasicBlock* bail_out_block, Value* this_val,
-    Value* pool_val, Value* tuple_val, Value* data_val) {
+    Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val,
+    Value* data_end_val, Value** ret_val) {
   LlvmCodeGen::LlvmBuilder* builder =
       reinterpret_cast<LlvmCodeGen::LlvmBuilder*>(void_builder);
   Function* read_field_fn;
@@ -882,18 +979,12 @@ Status HdfsAvroScanner::CodegenReadScalar(const 
AvroSchemaElement& element,
     // Need to pass an extra argument (the length) to the codegen function.
     Value* fixed_len = builder->getInt32(slot_desc->type().len);
     Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
-                                write_slot_val, opaque_slot_val, pool_val};
-    if (slot_desc->type().type == TYPE_VARCHAR) {
-      builder->CreateCall(read_field_fn, read_field_args);
-    } else {
-      // ReadAvroChar() returns false if allocation from MemPool fails.
-      Value* ret_val = builder->CreateCall(read_field_fn, read_field_args);
-      builder->CreateCondBr(ret_val, end_field_block, bail_out_block);
-    }
+                                data_end_val, write_slot_val, opaque_slot_val, 
pool_val};
+    *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
   } else {
-    Value* read_field_args[] =
-        {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, 
pool_val};
-    builder->CreateCall(read_field_fn, read_field_args);
+    Value* read_field_args[] = {this_val, slot_type_val, data_val, 
data_end_val,
+                                write_slot_val, opaque_slot_val, pool_val};
+    *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 316f697..2069a9b 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -68,6 +68,8 @@
 #include "exec/base-sequence-scanner.h"
 
 #include <avro/basics.h>
+
+#include "exec/read-write-util.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
 
@@ -105,6 +107,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   }
 
  private:
+  friend class HdfsAvroScannerTest;
+
   struct AvroFileHeader : public BaseSequenceScanner::FileHeader {
     /// The root of the file schema tree (i.e. the top-level record schema of 
the file)
     ScopedAvroSchemaElement schema;
@@ -131,7 +135,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   static const std::string AVRO_SNAPPY_CODEC;
   static const std::string AVRO_DEFLATE_CODEC;
 
-  typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**,
+  typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**, 
uint8_t*,
                                   Tuple*, TupleRow*);
 
   /// The codegen'd version of DecodeAvroData() if available, NULL otherwise.
@@ -172,17 +176,18 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Returns the number of tuples to be committed.
   /// - max_tuples: the maximum number of tuples to write
   /// - data: serialized record data. Is advanced as records are read.
+  /// - data_end: pointer to the end of the data buffer (i.e. the first 
invalid byte).
   /// - pool: memory pool to allocate string data from
   /// - tuple: tuple pointer to copy objects to
   /// - tuple_row: tuple row of written tuples
-  int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
+  int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, uint8_t* 
data_end,
       Tuple* tuple, TupleRow* tuple_row);
 
   /// Materializes a single tuple from serialized record data. Will return 
false and set
   /// error in parse_status_ if memory limit is exceeded when allocating new 
char buffer.
   /// See comments below for ReadAvroChar().
   bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
-      uint8_t** data, Tuple* tuple);
+      uint8_t** data, uint8_t* data_end, Tuple* tuple);
 
   /// Produces a version of DecodeAvroData that uses codegen'd instead of 
interpreted
   /// functions.
@@ -208,63 +213,80 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   ///     the bail_out block or some basic blocks before that.
   /// - bail_out: the block to jump to if anything fails. This is used in 
particular by
   ///     ReadAvroChar() which can exceed memory limit during allocation from 
MemPool.
-  /// - this_val, pool_val, tuple_val, data_val: arguments to 
MaterializeTuple()
+  /// - this_val, pool_val, tuple_val, data_val, data_end_val: arguments to
+  ///     MaterializeTuple()
   static Status CodegenReadRecord(
       const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* 
node,
       LlvmCodeGen* codegen, void* builder, llvm::Function* fn,
       llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, 
llvm::Value* this_val,
-      llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val);
+      llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val,
+      llvm::Value* data_end_val);
 
   /// Creates the IR for reading an Avro scalar at builder's current insert 
point.
   static Status CodegenReadScalar(const AvroSchemaElement& element,
       SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
-      llvm::BasicBlock* end_field_block, llvm::BasicBlock* bail_out_block,
       llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
-      llvm::Value* data_val);
+      llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val);
 
   /// The following are cross-compiled functions for parsing a serialized Avro 
primitive
   /// type and writing it to a slot. They can also be used for skipping a 
field without
   /// writing it to a slot by setting 'write_slot' to false.
   /// - data: Serialized record data. Is advanced past the read field.
+  /// - data_end: pointer to the end of the data buffer (i.e. the first 
invalid byte).
   /// The following arguments are used only if 'write_slot' is true:
   /// - slot: The tuple slot to write the parsed field into.
   /// - type: The type of the slot. (This is necessary because there is not a 
1:1 mapping
   ///         between Avro types and Impala's primitive types.)
   /// - pool: MemPool for string data.
   ///
-  /// ReadAvroChar() will return false and set error in parse_status_ if 
memory limit
-  /// is exceeded when allocating the new char buffer. It returns true 
otherwise.
+  /// All return false and set parse_status_ on error (e.g. mem limit exceeded 
when
+  /// allocating buffer, malformed data), and return true otherwise.
   ///
-  void ReadAvroBoolean(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
-  void ReadAvroInt32(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
-  void ReadAvroInt64(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
-  void ReadAvroFloat(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
-  void ReadAvroDouble(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
-  void ReadAvroVarchar(
-      PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* 
slot,
-      MemPool* pool);
-  bool ReadAvroChar(
-      PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* 
slot,
-      MemPool* pool);
-  void ReadAvroString(
-      PrimitiveType type, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
+  bool ReadAvroBoolean(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroInt32(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroInt64(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroFloat(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroDouble(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, 
uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, uint8_t* 
data_end,
+      bool write_slot, void* slot, MemPool* pool);
+  bool ReadAvroString(PrimitiveType type, uint8_t** data, uint8_t* data_end,
+      bool write_slot, void* slot, MemPool* pool);
+
+  /// Helper function for some of the above. Returns the the length of certain 
varlen
+  /// types and updates 'data'. Returns true on success, returns false and 
updates
+  /// parse_status_ on error.
+  ReadWriteUtil::ZLongResult ReadFieldLen(uint8_t** data, uint8_t* data_end);
 
   /// Same as the above functions, except takes the size of the decimal slot 
(i.e. 4, 8, or
   /// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is 
passed
   /// explicitly, rather than passing a ColumnType, so we can easily pass in a 
constant in
   /// the codegen'd MaterializeTuple() function. If 'write_slot' is false, 
'slot_byte_size'
   /// is ignored.
-  void ReadAvroDecimal(
-      int slot_byte_size, uint8_t** data, bool write_slot, void* slot, 
MemPool* pool);
+  bool ReadAvroDecimal(
+      int slot_byte_size, uint8_t** data, uint8_t* data_end, bool write_slot, 
void* slot,
+      MemPool* pool);
+
+  /// Reads and advances 'data' past the union branch index and sets 'is_null' 
according
+  /// to if the corresponding element is null. 'null_union_position' must be 0 
or
+  /// 1. Returns false and sets parse_status_ if there's an error, otherwise 
returns true.
+  bool ReadUnionType(int null_union_position, uint8_t** data, uint8_t* 
data_end,
+      bool* is_null);
+
+  /// Helper functions to set parse_status_ outside of xcompiled functions. 
This is to
+  /// avoid including string construction, etc. in the IR, which boths bloats 
it and can
+  /// contain exception handling code.
+  void SetStatusCorruptData(TErrorCode::type error_code);
+  void SetStatusInvalidValue(TErrorCode::type error_code, int64_t len);
 
-  /// Reads and advances 'data' past the union branch index and returns true 
if the
-  /// corresponding element is non-null. 'null_union_position' must be 0 or 1.
-  bool ReadUnionType(int null_union_position, uint8_t** data);
+  /// Unit test constructor
+  HdfsAvroScanner();
 
   static const char* LLVM_CLASS_NAME;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc 
b/be/src/exec/hdfs-avro-table-writer.cc
index 9ee704d..f3bc3cb 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -71,7 +71,8 @@ inline void HdfsAvroTableWriter::AppendField(const 
ColumnType& type, const void*
   // Each avro field is written as union, which is a ZLong indicating the union
   // field followed by the encoded value. Impala/Hive always stores values as
   // a union of [ColumnType, NULL].
-  // TODO check if we want to support [NULL, ColumnType] union
+  // TODO: the writer may be asked to write [NULL, ColumnType] unions. It is 
wrong
+  // for us to assume [ColumnType, NULL].
 
   if (value == NULL) {
     // indicate the second field of the union

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 74a5efb..78d4994 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -41,6 +41,7 @@
 #include "util/runtime-profile-counters.h"
 #include "util/sse-util.h"
 #include "util/string-parser.h"
+#include "util/test-info.h"
 #include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"
@@ -71,6 +72,26 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, 
RuntimeState* state)
       write_tuples_fn_(NULL) {
 }
 
+HdfsScanner::HdfsScanner()
+    : scan_node_(NULL),
+      state_(NULL),
+      context_(NULL),
+      stream_(NULL),
+      scanner_conjunct_ctxs_(NULL),
+      template_tuple_(NULL),
+      tuple_byte_size_(-1),
+      tuple_(NULL),
+      batch_(NULL),
+      tuple_mem_(NULL),
+      num_null_bytes_(-1),
+      parse_status_(Status::OK()),
+      decompression_type_(THdfsCompression::NONE),
+      data_buffer_pool_(NULL),
+      decompress_timer_(NULL),
+      write_tuples_fn_(NULL) {
+  DCHECK(TestInfo::is_test());
+}
+
 HdfsScanner::~HdfsScanner() {
   DCHECK(batch_ == NULL);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 9206e77..7f804f1 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -420,6 +420,9 @@ class HdfsScanner {
   /// WriteCompleteTuple() because it's easier than writing IR to access
   /// scanner_conjunct_ctxs_.
   ExprContext* GetConjunctCtx(int idx) const;
+
+  /// Unit test constructor
+  HdfsScanner();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/read-write-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.cc b/be/src/exec/read-write-util.cc
index cc0cd0a..22e3abd 100644
--- a/be/src/exec/read-write-util.cc
+++ b/be/src/exec/read-write-util.cc
@@ -18,23 +18,76 @@
 
 using namespace impala;
 
+namespace {
+
+// Returns MAX_ZLONG_LEN + 1 if the encoded int is more than MAX_ZLONG_LEN 
bytes long,
+// otherwise returns the length of the encoded int. Reads MAX_ZLONG_LEN bytes 
in 'buf'.
+int FindZIntegerLength(uint8_t* buf) {
+  uint64_t x = *reinterpret_cast<uint64_t*>(buf);
+  for (int i = 0; i < sizeof(x); ++i) {
+    if ((x & (0x80LL << (i * 8))) == 0) return i + 1;
+  }
+  uint16_t y = *reinterpret_cast<uint16_t*>(buf + 8);
+  if ((y & 0x80) == 0) return 9;
+  if ((y & 0x8000) == 0) return 10;
+  return 11;
+}
+
+// Slow path for ReadZInteger() that checks for out-of-bounds on every byte
+template <int MAX_LEN, typename ZResultType>
+ZResultType ReadZIntegerSlow(uint8_t** buf, uint8_t* buf_end) {
+  uint64_t zlong = 0;
+  int shift = 0;
+  bool more = true;
+  for (int i = 0; more && i < MAX_LEN; ++i) {
+    if (UNLIKELY(*buf >= buf_end)) return ZResultType::error();
+    DCHECK_LE(shift, 64);
+    zlong |= static_cast<uint64_t>(**buf & 0x7f) << shift;
+    shift += 7;
+    more = (**buf & 0x80) != 0;
+    ++(*buf);
+  }
+  // Invalid int that's longer than maximum
+  if (UNLIKELY(more)) return ZResultType::error();
+  return ZResultType((zlong >> 1) ^ -(zlong & 1));
+}
+
+}
+
 // This function is not inlined because it can potentially cause LLVM to crash 
(see
 // http://llvm.org/bugs/show_bug.cgi?id=19315), and inlining does not appear 
to have any
 // performance impact.
-int64_t ReadWriteUtil::ReadZLong(uint8_t** buf) {
+template <int MAX_LEN, typename ZResultType>
+ZResultType ReadWriteUtil::ReadZInteger(uint8_t** buf, uint8_t* buf_end) {
+  DCHECK(MAX_LEN == MAX_ZINT_LEN || MAX_LEN == MAX_ZLONG_LEN);
+
+  // Use MAX_ZLONG_LEN rather than MAX_LEN since FindZIntegerLength() always 
assumes at
+  // least MAX_ZLONG_LEN bytes in buffer.
+  if (UNLIKELY(buf_end - *buf < MAX_ZLONG_LEN)) {
+    return ReadZIntegerSlow<MAX_LEN, ZResultType>(buf, buf_end);
+  }
+  // Once we get here, we don't need to worry about going off end of buffer.
+  int num_bytes = FindZIntegerLength(*buf);
+  if (UNLIKELY(num_bytes > MAX_LEN)) return ZResultType::error();
+
   uint64_t zlong = 0;
   int shift = 0;
-  bool more;
-  do {
-    DCHECK_LE(shift, 64);
+  for (int i = 0; i < num_bytes; ++i) {
     zlong |= static_cast<uint64_t>(**buf & 0x7f) << shift;
     shift += 7;
-    more = (**buf & 0x80) != 0;
     ++(*buf);
-  } while (more);
-  return (zlong >> 1) ^ -(zlong & 1);
+  }
+  return ZResultType((zlong >> 1) ^ -(zlong & 1));
 }
 
+// Instantiate the template for long and int.
+template ReadWriteUtil::ZLongResult
+ReadWriteUtil::ReadZInteger<ReadWriteUtil::MAX_ZLONG_LEN, 
ReadWriteUtil::ZLongResult>(
+    uint8_t** buf, uint8_t* buf_end);
+template ReadWriteUtil::ZIntResult
+ReadWriteUtil::ReadZInteger<ReadWriteUtil::MAX_ZINT_LEN, 
ReadWriteUtil::ZIntResult>(
+    uint8_t** buf, uint8_t* buf_end);
+
 int ReadWriteUtil::PutZInt(int32_t integer, uint8_t* buf) {
   // Move the sign bit to the first bit.
   uint32_t uinteger = (integer << 1) ^ (integer >> 31);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index 2321939..fa9de9b 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -84,13 +84,43 @@ class ReadWriteUtil {
   /// Determines the total length in bytes of a Writable VInt/VLong from the 
first byte.
   static int DecodeVIntSize(int8_t byte);
 
+  /// Return values for ReadZLong() and ReadZInt(). We return these in a 
single struct,
+  /// rather than using an output parameter, for performance (this way both 
values are
+  /// returned as registers).
+  template <typename T>
+  struct ZResult {
+    /// False if there was a problem reading the value.
+    bool ok;
+    /// The decoded value. Only valid if 'ok' is true.
+    T val;
+
+    ZResult(T v) : ok(true), val(v) { }
+    static ZResult error() { return ZResult(); }
+
+   private:
+    ZResult() : ok(false) { }
+  };
+
+  typedef ZResult<int64_t> ZLongResult;
+  typedef ZResult<int32_t> ZIntResult;
+
   /// Read a zig-zag encoded long. This is the integer encoding defined by 
google.com
-  /// protocol-buffers: 
https://developers.google.com/protocol-buffers/docs/encoding
-  /// *buf is incremented past the encoded long.
-  static int64_t ReadZLong(uint8_t** buf);
+  /// protocol-buffers: 
https://developers.google.com/protocol-buffers/docs/encoding. *buf
+  /// is incremented past the encoded long. 'buf_end' should point to the end 
of 'buf'
+  /// (i.e. the first invalid byte).
+  ///
+  /// Returns a non-OK result if the encoded int spans too much many bytes. 
Unspecified
+  /// for values that have the correct number of bytes but overflow the 
destination type
+  /// (for both long and int, there are extra bits in the highest-order byte).
+  static inline ZLongResult ReadZLong(uint8_t** buf, uint8_t* buf_end) {
+    return ReadZInteger<MAX_ZLONG_LEN, ZLongResult>(buf, buf_end);
+  }
+
 
   /// Read a zig-zag encoded int.
-  static int32_t ReadZInt(uint8_t** buf);
+  static inline ZIntResult ReadZInt(uint8_t** buf, uint8_t* buf_end) {
+    return ReadZInteger<MAX_ZINT_LEN, ZIntResult>(buf, buf_end);
+  }
 
   /// The following methods read data from a buffer without assuming the 
buffer is long
   /// enough. If the buffer isn't long enough or another error occurs, they 
return false
@@ -104,6 +134,12 @@ class ReadWriteUtil {
 
   /// Skip the next num_bytes bytes.
   static bool SkipBytes(uint8_t** buf, int* buf_len, int num_bytes, Status* 
status);
+
+ private:
+  /// Implementation for ReadZLong() and ReadZInt(). MAX_LEN is MAX_ZLONG_LEN 
or
+  /// MAX_ZINT_LEN.
+  template<int MAX_LEN, typename ZResult>
+  static ZResult ReadZInteger(uint8_t** buf, uint8_t* buf_end);
 };
 
 template<>
@@ -213,11 +249,6 @@ inline int64_t ReadWriteUtil::PutVInt(int32_t val, 
uint8_t* buf) {
   return PutVLong(val, buf);
 }
 
-inline int32_t ReadWriteUtil::ReadZInt(uint8_t** buf) {
-  int64_t zlong = ReadZLong(buf);
-  return static_cast<int32_t>(zlong);
-}
-
 template <class T>
 inline bool ReadWriteUtil::Read(uint8_t** buf, int* buf_len, T* val, Status* 
status) {
   int val_len = sizeof(T);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 833436c..3c78d88 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -121,7 +121,12 @@ void 
ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
 
-  // io_buffer_ should only be null the first time this is called
+  // Nothing to do if we've already processed all data in the file
+  int64_t offset = file_offset() + boundary_buffer_bytes_left_;
+  int64_t file_bytes_remaining = file_desc()->file_length - offset;
+  if (io_buffer_ == NULL && file_bytes_remaining == 0) return Status::OK();
+
+  // Otherwise, io_buffer_ should only be null the first time this is called
   DCHECK(io_buffer_ != NULL ||
          (total_bytes_returned_ == 0 && completed_io_buffers_.empty()));
 
@@ -140,11 +145,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_));
   } else {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
-    int64_t offset = file_offset() + boundary_buffer_bytes_left_;
 
     int64_t read_past_buffer_size = read_past_size_cb_.empty() ?
         DEFAULT_READ_PAST_SIZE : read_past_size_cb_(offset);
-    int64_t file_bytes_remaining = file_desc()->file_length - offset;
     read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
     read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
     // We're reading past the scan range. Be careful not to read past the end 
of file.
@@ -258,7 +261,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t 
requested_len,
 
   // We have enough bytes in io_buffer_ or couldn't read more bytes
   int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
-  DCHECK_GE(requested_len, 0);
+  DCHECK_GE(requested_bytes_left, 0);
   int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);
   *out_len = boundary_buffer_bytes_left_ + num_bytes;
   DCHECK_LE(*out_len, requested_len);
@@ -306,3 +309,7 @@ Status ScannerContext::Stream::ReportIncompleteRead(int64_t 
length, int64_t byte
 Status ScannerContext::Stream::ReportInvalidRead(int64_t length) {
   return Status(TErrorCode::SCANNER_INVALID_READ, length, filename(), 
file_offset());
 }
+
+Status ScannerContext::Stream::ReportInvalidInt() {
+  return Status(TErrorCode::SCANNER_INVALID_INT, filename(), file_offset());
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index e1dee97..5d6ea06 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -241,6 +241,7 @@ class ScannerContext {
     /// Error-reporting functions.
     Status ReportIncompleteRead(int64_t length, int64_t bytes_read);
     Status ReportInvalidRead(int64_t length);
+    Status ReportInvalidInt();
   };
 
   bool HasStream() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.inline.h 
b/be/src/exec/scanner-context.inline.h
index 1a36b66..1b12b52 100644
--- a/be/src/exec/scanner-context.inline.h
+++ b/be/src/exec/scanner-context.inline.h
@@ -146,16 +146,20 @@ inline bool ScannerContext::Stream::ReadVLong(int64_t* 
value, Status* status) {
 }
 
 inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) {
-  uint64_t zlong = 0;
-  int shift = 0;
-  uint8_t* byte;
-  do {
-    DCHECK_LE(shift, 64);
-    RETURN_IF_FALSE(ReadBytes(1, &byte, status));
-    zlong |= static_cast<uint64_t>(*byte & 0x7f) << shift;
-    shift += 7;
-  } while (*byte & 0x80);
-  *value = (zlong >> 1) ^ -(zlong & 1);
+  uint8_t* bytes;
+  int64_t bytes_len;
+  RETURN_IF_FALSE(
+      GetBytes(ReadWriteUtil::MAX_ZLONG_LEN, &bytes, &bytes_len, status, 
true));
+
+  uint8_t* new_bytes = bytes;
+  ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_bytes, bytes + 
bytes_len);
+  if (UNLIKELY(!r.ok)) {
+    *status = ReportInvalidInt();
+    return false;
+  }
+  *value = r.val;
+  int64_t bytes_read = new_bytes - bytes;
+  RETURN_IF_FALSE(SkipBytes(bytes_read, status));
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/zigzag-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/zigzag-test.cc b/be/src/exec/zigzag-test.cc
index 6622d56..247510f 100644
--- a/be/src/exec/zigzag-test.cc
+++ b/be/src/exec/zigzag-test.cc
@@ -26,30 +26,51 @@
 
 namespace impala {
 
+void TestZInt(uint8_t* buf, int64_t buf_len, int32_t expected_val,
+    int expected_encoded_len) {
+  uint8_t* new_buf = buf;
+  ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(&new_buf, buf + 
buf_len);
+  EXPECT_TRUE(r.ok);
+  EXPECT_EQ(r.val, expected_val);
+  EXPECT_EQ(new_buf - buf, expected_encoded_len);
+}
+
 void TestZInt(int32_t value) {
   uint8_t buf[ReadWriteUtil::MAX_ZINT_LEN];
   int plen = ReadWriteUtil::PutZInt(value, static_cast<uint8_t*>(buf));
   EXPECT_TRUE(plen <= ReadWriteUtil::MAX_ZINT_LEN);
+  TestZInt(buf, sizeof(buf), value, plen);
+}
 
-  uint8_t* buf_ptr = static_cast<uint8_t*>(buf);
-  int32_t val = ReadWriteUtil::ReadZInt(&buf_ptr);
-  EXPECT_EQ(value, val);
-  int len = buf_ptr - buf;
-  EXPECT_GT(len, 0);
-  EXPECT_LE(len, sizeof(buf));
+void TestZLong(uint8_t* buf, int64_t buf_len, int64_t expected_val,
+    int expected_encoded_len) {
+  uint8_t* new_buf = buf;
+  ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_buf, buf + 
buf_len);
+  EXPECT_TRUE(r.ok);
+  EXPECT_EQ(r.val, expected_val);
+  EXPECT_EQ(new_buf - buf, expected_encoded_len);
 }
 
 void TestZLong(int64_t value) {
   uint8_t buf[ReadWriteUtil::MAX_ZLONG_LEN];
   int plen = ReadWriteUtil::PutZLong(value, static_cast<uint8_t*>(buf));
   EXPECT_TRUE(plen <= ReadWriteUtil::MAX_ZLONG_LEN);
+  TestZLong(buf, sizeof(buf), value, plen);
+}
 
-  uint8_t* buf_ptr = static_cast<uint8_t*>(buf);
-  int64_t val = ReadWriteUtil::ReadZLong(&buf_ptr);
-  EXPECT_EQ(value, val);
-  int len = buf_ptr - buf;
-  EXPECT_GT(len, 0);
-  EXPECT_LE(len, sizeof(buf));
+// No expected value
+void TestZInt(uint8_t* buf, int64_t buf_len, int expected_encoded_len) {
+  uint8_t* new_buf = buf;
+  ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(&new_buf, buf + 
buf_len);
+  EXPECT_TRUE(r.ok);
+  EXPECT_EQ(new_buf - buf, expected_encoded_len);
+}
+
+void TestZLong(uint8_t* buf, int64_t buf_len, int expected_encoded_len) {
+  uint8_t* new_buf = buf;
+  ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_buf, buf + 
buf_len);
+  EXPECT_TRUE(r.ok);
+  EXPECT_EQ(new_buf - buf, expected_encoded_len);
 }
 
 // Test put and get of zigzag integers and longs.
@@ -60,7 +81,7 @@ TEST(ZigzagTest, Basic) {
   TestZInt(INT_MIN);
   TestZInt(SHRT_MIN);
   TestZInt(SHRT_MAX);
-  TestZInt(0);
+  TestZLong(0);
   TestZLong(LONG_MAX);
   TestZLong(LONG_MIN);
   TestZLong(INT_MAX);
@@ -78,6 +99,66 @@ TEST(ZigzagTest, Basic) {
     TestZLong((static_cast<int64_t>(value) << 32) | value);
   }
 }
+
+TEST(ZigzagTest, Errors) {
+  uint8_t buf[100];
+  memset(buf, 0x80, sizeof(buf));
+
+  // Test 100-byte int
+  uint8_t* buf_ptr = static_cast<uint8_t*>(buf);
+  int64_t buf_len = sizeof(buf);
+  EXPECT_TRUE(ReadWriteUtil::ReadZLong(&buf_ptr, buf + buf_len).error);
+  EXPECT_TRUE(ReadWriteUtil::ReadZInt(&buf_ptr, buf + buf_len).error);
+
+  // Test truncated int
+  buf_ptr = static_cast<uint8_t*>(buf);
+  buf_len = ReadWriteUtil::MAX_ZLONG_LEN - 1;
+  EXPECT_TRUE(ReadWriteUtil::ReadZLong(&buf_ptr, buf + buf_len).error);
+  buf_len = ReadWriteUtil::MAX_ZINT_LEN - 1;
+  EXPECT_TRUE(ReadWriteUtil::ReadZInt(&buf_ptr, buf + buf_len).error);
+}
+
+  // Test weird encodings and values that are arguably invalid but we still 
accept
+TEST(ZigzagTest, Weird) {
+  uint8_t buf[100];
+
+  // Decodes to 0 but encoded in two bytes
+  buf[0] = 0x80;
+  buf[1] = 0x0;
+  TestZInt(buf, 2, 0, 2);
+  TestZLong(buf, 2, 0, 2);
+  TestZInt(buf, sizeof(buf), 0, 2);
+  TestZLong(buf, sizeof(buf), 0, 2);
+
+  // Decodes to 1 but encoded in MAX_ZINT_LEN bytes
+  memset(buf, 0x80, ReadWriteUtil::MAX_ZINT_LEN);
+  buf[0] = 0x82;
+  buf[ReadWriteUtil::MAX_ZINT_LEN - 1] = 0x0;
+  TestZInt(buf, ReadWriteUtil::MAX_ZINT_LEN, 1, ReadWriteUtil::MAX_ZINT_LEN);
+  TestZLong(buf, ReadWriteUtil::MAX_ZINT_LEN, 1, ReadWriteUtil::MAX_ZINT_LEN);
+  TestZInt(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZINT_LEN);
+  TestZLong(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZINT_LEN);
+
+  // Decodes to 1 but encoded in MAX_ZLONG_LEN bytes
+  memset(buf, 0x80, ReadWriteUtil::MAX_ZLONG_LEN);
+  buf[0] = 0x82;
+  buf[ReadWriteUtil::MAX_ZLONG_LEN - 1] = 0x0;
+  TestZLong(buf, ReadWriteUtil::MAX_ZLONG_LEN, 1, 
ReadWriteUtil::MAX_ZLONG_LEN);
+  TestZLong(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZLONG_LEN);
+
+  // Overflows a long. Check that we don't crash and decode the correct number 
of bytes,
+  // but don't check for a particular value.
+  memset(buf, 0xff, ReadWriteUtil::MAX_ZLONG_LEN);
+  buf[ReadWriteUtil::MAX_ZLONG_LEN - 1] ^= 0x80;
+  TestZLong(buf, ReadWriteUtil::MAX_ZLONG_LEN, ReadWriteUtil::MAX_ZLONG_LEN);
+
+  // Overflows an int. Check that we don't crash and decode the correct number 
of bytes,
+  // but don't check for a particular value.
+  memset(buf, 0xff, ReadWriteUtil::MAX_ZINT_LEN);
+  buf[ReadWriteUtil::MAX_ZINT_LEN - 1] ^= 0x80;
+  TestZInt(buf, ReadWriteUtil::MAX_ZINT_LEN, ReadWriteUtil::MAX_ZINT_LEN);
+
+}
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 671d27e..6c8a621 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -245,6 +245,26 @@ error_codes = (
   ("PARTITIONED_AGG_REPARTITION_FAILS", 78,  "Cannot perform aggregation at 
node with "
    "id $0. Repartitioning did not reduce the size of a spilled partition. 
Repartitioning "
    "level $1. Number of rows $2."),
+
+  ("AVRO_TRUNCATED_BLOCK", 79, "File '$0' is corrupt: truncated data block at 
offset $1"),
+
+  ("AVRO_INVALID_UNION", 80, "File '$0' is corrupt: invalid union value $1 at 
offset $2"),
+
+  ("AVRO_INVALID_BOOLEAN", 81, "File '$0' is corrupt: invalid boolean value $1 
at offset "
+   "$2"),
+
+  ("AVRO_INVALID_LENGTH", 82, "File '$0' is corrupt: invalid length $1 at 
offset $2"),
+
+  ("SCANNER_INVALID_INT", 83, "File '$0' is corrupt: invalid encoded integer 
at offset $1"),
+
+  ("AVRO_INVALID_RECORD_COUNT", 84, "File '$0' is corrupt: invalid record 
count $1 at "
+   "offset $2"),
+
+  ("AVRO_INVALID_COMPRESSED_SIZE", 85, "File '$0' is corrupt: invalid 
compressed block "
+   "size $1 at offset $2"),
+
+  ("AVRO_INVALID_METADATA_COUNT", 86, "File '$0' is corrupt: invalid metadata 
count $1 "
+   "at offset $2"),
 )
 
 import sys


Reply via email to