Repository: kudu
Updated Branches:
  refs/heads/master 17d1367e1 -> cb92799ba


KUDU-1363: Add IN-list predicate type

Adds support in the C++ client for providing a set of equalities for a
given column. Support for using IN list predicates from the Java client
will be in a follow-up commit.

Change-Id: I986cb13097f1cf4af2b752f58c4c38f412a6a598
Reviewed-on: http://gerrit.cloudera.org:8080/2986
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9b50bd58
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9b50bd58
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9b50bd58

Branch: refs/heads/master
Commit: 9b50bd585f62834f9d65787344e545bd6846404c
Parents: 17d1367
Author: Sameer Abhyankar <[email protected]>
Authored: Thu Jun 9 08:31:42 2016 -0500
Committer: Dan Burkert <[email protected]>
Committed: Thu Sep 29 01:30:20 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client.cc                 |  32 ++-
 src/kudu/client/client.h                  |  26 ++
 src/kudu/client/predicate-test.cc         |  93 +++++++
 src/kudu/client/scan_predicate-internal.h |  33 ++-
 src/kudu/client/scan_predicate.cc         |  35 +++
 src/kudu/client/scan_predicate.h          |   1 +
 src/kudu/client/value.h                   |   1 +
 src/kudu/common/column_predicate-test.cc  | 336 ++++++++++++++++++++++++-
 src/kudu/common/column_predicate.cc       | 271 ++++++++++++++++++--
 src/kudu/common/column_predicate.h        |  46 +++-
 src/kudu/common/common.proto              |   7 +
 src/kudu/common/key_util.cc               |  26 +-
 src/kudu/common/scan_spec-test.cc         |  43 ++++
 src/kudu/common/scan_spec.cc              |   6 +-
 src/kudu/common/wire_protocol-test.cc     |  63 +++++
 src/kudu/common/wire_protocol.cc          |  18 ++
 16 files changed, 1000 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8314019..58ee0d4 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -52,6 +52,7 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -177,11 +178,11 @@ Status SetInternalSignalNumber(int signum) {
   return SetStackTraceSignal(signum);
 }
 
-std::string GetShortVersionString() {
+string GetShortVersionString() {
   return VersionInfo::GetShortVersionString();
 }
 
-std::string GetAllVersionInfo() {
+string GetAllVersionInfo() {
   return VersionInfo::GetAllVersionInfo();
 }
 
@@ -357,7 +358,7 @@ Status KuduClient::ListTables(vector<string>* tables,
 }
 
 Status KuduClient::TableExists(const string& table_name, bool* exists) {
-  std::vector<std::string> tables;
+  vector<string> tables;
   RETURN_NOT_OK(ListTables(&tables, table_name));
   for (const string& table : tables) {
     if (table == table_name) {
@@ -497,12 +498,12 @@ KuduTableCreator& KuduTableCreator::schema(const 
KuduSchema* schema) {
   return *this;
 }
 
-KuduTableCreator& KuduTableCreator::add_hash_partitions(const 
std::vector<std::string>& columns,
+KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& 
columns,
                                                         int32_t num_buckets) {
   return add_hash_partitions(columns, num_buckets, 0);
 }
 
-KuduTableCreator& KuduTableCreator::add_hash_partitions(const 
std::vector<std::string>& columns,
+KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& 
columns,
                                                         int32_t num_buckets, 
int32_t seed) {
   PartitionSchemaPB::HashBucketSchemaPB* bucket_schema =
     data_->partition_schema_.add_hash_bucket_schemas();
@@ -514,8 +515,7 @@ KuduTableCreator& 
KuduTableCreator::add_hash_partitions(const std::vector<std::s
   return *this;
 }
 
-KuduTableCreator& KuduTableCreator::set_range_partition_columns(
-    const std::vector<std::string>& columns) {
+KuduTableCreator& KuduTableCreator::set_range_partition_columns(const 
vector<string>& columns) {
   PartitionSchemaPB::RangeSchemaPB* range_schema =
     data_->partition_schema_.mutable_range_schema();
   range_schema->Clear();
@@ -721,6 +721,24 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const 
Slice& col_name,
   return new KuduPredicate(new ComparisonPredicateData(s->column(col_idx), op, 
value));
 }
 
+KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
+                                             vector<KuduValue*>* values) {
+  StringPiece name_sp(reinterpret_cast<const char*>(col_name.data()), 
col_name.size());
+  const Schema* s = data_->schema_.schema_;
+  int col_idx = s->find_column(name_sp);
+  if (col_idx == Schema::kColumnNotFound) {
+    // Since this function doesn't return an error, instead we create a special
+    // predicate that just returns the errors when we add it to the scanner.
+    //
+    // This makes the API more "fluent".
+    STLDeleteElements(values); // we always take ownership of 'values'.
+    delete values;
+    return new KuduPredicate(new ErrorPredicateData(
+      Status::NotFound("column not found", col_name)));
+  }
+  return new KuduPredicate(new InListPredicateData(s->column(col_idx), 
values));
+}
+
 ////////////////////////////////////////////////////////////
 // Error
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index b3cf5ba..582291b 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -826,6 +826,32 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
                                         KuduPredicate::ComparisonOp op,
                                         KuduValue* value);
 
+  /// Create a new IN list predicate which can be used for scanners on this
+  /// table.
+  ///
+  /// The IN list predicate is used to specify a list of values that a column
+  /// must match. A row is filtered from the scan if the value of the column
+  /// does not equal any value from the list.
+  ///
+  /// The type of entries in the list must correspond to the type of the column
+  /// to which the predicate is to be applied. For example, if the given column
+  /// is any type of integer, the KuduValues should also be integers, with the
+  /// values in the valid range for the column type. No attempt is made to cast
+  /// between floating point and integer values, or numeric and string values.
+  ///
+  /// @param [in] col_name
+  ///   Name of the column to which the predicate applies.
+  /// @param [in] values
+  ///   Vector of values which the column will be matched against.
+  /// @return Raw pointer to an IN list predicate. The caller owns the 
predicate
+  ///   until it is passed into KuduScanner::AddConjunctPredicate(). The
+  ///   returned predicate takes ownership of the values vector and its
+  ///   elements. In the case of an error (e.g. an invalid column name), a
+  ///   non-NULL value is still returned. The error will be returned when
+  ///   attempting to add this predicate to a KuduScanner.
+  KuduPredicate* NewInListPredicate(const Slice& col_name,
+                                    std::vector<KuduValue*>* values);
+
   /// @return The KuduClient object associated with the table. The caller
   ///   should not free the returned pointer.
   KuduClient* client() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/predicate-test.cc 
b/src/kudu/client/predicate-test.cc
index 7a4e253..4f01219 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -102,6 +102,19 @@ class PredicateTest : public KuduTest {
     return rows;
   }
 
+  template <typename T>
+  int CountMatchedRows(const vector<T>& values, const vector<T>& test_values) {
+
+    int count = 0;
+    for (const T& v : values) {
+      if (std::any_of(test_values.begin(), test_values.end(),
+                      [&] (const T& t) { return t == v; })) {
+        count += 1;
+      }
+    }
+    return count;
+  }
+
   // Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and
   // boundary values.
   template <typename T>
@@ -289,6 +302,20 @@ class PredicateTest : public KuduTest {
         }));
       }
     }
+
+    // IN list predicates
+    std::random_shuffle(test_values.begin(), test_values.end());
+
+    for (auto end = test_values.begin(); end <= test_values.end(); end++) {
+      vector<KuduValue*> vals;
+
+      for (auto itr = test_values.begin(); itr != end; itr++) {
+        vals.push_back(KuduValue::FromInt(*itr));
+      }
+
+      int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), 
end));
+      ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+    }
   }
 
   // Check string predicates against the specified table.
@@ -384,6 +411,20 @@ class PredicateTest : public KuduTest {
         }));
       }
     }
+
+    // IN list predicates
+    std::random_shuffle(test_values.begin(), test_values.end());
+
+    for (auto end = test_values.begin(); end <= test_values.end(); end++) {
+      vector<KuduValue*> vals;
+
+      for (auto itr = test_values.begin(); itr != end; itr++) {
+        vals.push_back(KuduValue::CopyString(*itr));
+      }
+
+      int count = CountMatchedRows<string>(values, 
vector<string>(test_values.begin(), end));
+      ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+    }
   }
 
   shared_ptr<KuduClient> client_;
@@ -479,6 +520,30 @@ TEST_F(PredicateTest, TestBoolPredicates) {
                                                         
KuduValue::FromBool(true));
     ASSERT_EQ(1, CountRows(table, { pred }));
   }
+
+  { // value IN ()
+    vector<KuduValue*> values = { };
+    KuduPredicate* pred = table->NewInListPredicate("value", &values);
+    ASSERT_EQ(0, CountRows(table, { pred }));
+  }
+
+  { // value IN (true)
+    vector<KuduValue*> values = { KuduValue::FromBool(true) };
+    KuduPredicate* pred = table->NewInListPredicate("value", &values);
+    ASSERT_EQ(1, CountRows(table, { pred }));
+  }
+
+  { // value IN (false)
+    vector<KuduValue*> values = { KuduValue::FromBool(false) };
+    KuduPredicate* pred = table->NewInListPredicate("value", &values);
+    ASSERT_EQ(1, CountRows(table, { pred }));
+  }
+
+  { // value IN (true, false)
+    vector<KuduValue*> values = { KuduValue::FromBool(false), 
KuduValue::FromBool(true) };
+    KuduPredicate* pred = table->NewInListPredicate("value", &values);
+    ASSERT_EQ(2, CountRows(table, { pred }));
+  }
 }
 
 TEST_F(PredicateTest, TestInt8Predicates) {
@@ -682,6 +747,20 @@ TEST_F(PredicateTest, TestFloatPredicates) {
       }));
     }
   }
+
+  // IN list predicates
+  std::random_shuffle(test_values.begin(), test_values.end());
+
+  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
+    vector<KuduValue*> vals;
+
+    for (auto itr = test_values.begin(); itr != end; itr++) {
+      vals.push_back(KuduValue::FromFloat(*itr));
+    }
+
+    int count = CountMatchedRows<float>(values, 
vector<float>(test_values.begin(), end));
+    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+  }
 }
 
 TEST_F(PredicateTest, TestDoublePredicates) {
@@ -785,6 +864,20 @@ TEST_F(PredicateTest, TestDoublePredicates) {
       }));
     }
   }
+
+  // IN list predicates
+  std::random_shuffle(test_values.begin(), test_values.end());
+
+  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
+    vector<KuduValue*> vals;
+
+    for (auto itr = test_values.begin(); itr != end; itr++) {
+      vals.push_back(KuduValue::FromDouble(*itr));
+    }
+
+    int count = CountMatchedRows<double>(values, 
vector<double>(test_values.begin(), end));
+    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+  }
 }
 
 TEST_F(PredicateTest, TestStringPredicates) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/scan_predicate-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate-internal.h 
b/src/kudu/client/scan_predicate-internal.h
index 61205f4..05585f6 100644
--- a/src/kudu/client/scan_predicate-internal.h
+++ b/src/kudu/client/scan_predicate-internal.h
@@ -17,9 +17,11 @@
 #ifndef KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H
 #define KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H
 
+#include <vector>
+
 #include "kudu/client/scan_predicate.h"
-#include "kudu/client/value.h"
 #include "kudu/client/value-internal.h"
+#include "kudu/client/value.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/memory/arena.h"
@@ -46,7 +48,7 @@ class KuduPredicate::Data {
 class ErrorPredicateData : public KuduPredicate::Data {
  public:
   explicit ErrorPredicateData(const Status& s)
-  : status_(s) {
+      : status_(s) {
   }
 
   virtual ~ErrorPredicateData() {
@@ -77,7 +79,7 @@ class ComparisonPredicateData : public KuduPredicate::Data {
   Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
 
   ComparisonPredicateData* Clone() const override {
-      return new ComparisonPredicateData(col_, op_, val_->Clone());
+    return new ComparisonPredicateData(col_, op_, val_->Clone());
   }
 
  private:
@@ -88,6 +90,31 @@ class ComparisonPredicateData : public KuduPredicate::Data {
   gscoped_ptr<KuduValue> val_;
 };
 
+// A list predicate for a column and a list of constant values.
+class InListPredicateData : public KuduPredicate::Data {
+ public:
+  InListPredicateData(ColumnSchema col, std::vector<KuduValue*>* values);
+
+  virtual ~InListPredicateData();
+
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
+
+  InListPredicateData* Clone() const override {
+    std::vector<KuduValue*> values(vals_.size());
+    for (KuduValue* val : vals_) {
+      values.push_back(val->Clone());
+    }
+
+    return new InListPredicateData(col_, &values);
+  }
+
+ private:
+  friend class KuduScanner;
+
+  ColumnSchema col_;
+  std::vector<KuduValue*> vals_;
+};
+
 } // namespace client
 } // namespace kudu
 #endif /* KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/scan_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.cc 
b/src/kudu/client/scan_predicate.cc
index f12bcc2..06ba70b 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -19,14 +19,17 @@
 
 #include <boost/optional.hpp>
 #include <utility>
+#include <vector>
 
 #include "kudu/client/scan_predicate-internal.h"
 #include "kudu/client/value-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/common/scan_spec.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 
 using std::move;
+using std::vector;
 using boost::optional;
 
 namespace kudu {
@@ -104,5 +107,37 @@ Status ComparisonPredicateData::AddToScanSpec(ScanSpec* 
spec, Arena* arena) {
   return Status::OK();
 }
 
+InListPredicateData::InListPredicateData(ColumnSchema col,
+                                         vector<KuduValue*>* values)
+    : col_(move(col)) {
+  vals_.swap(*values);
+}
+
+InListPredicateData::~InListPredicateData() {
+  STLDeleteElements(&vals_);
+}
+
+Status InListPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
+  vector<const void*> vals_list;
+  vals_list.reserve(vals_.size());
+  for (auto value : vals_) {
+    void* val_void;
+    // The local vals_ list consists of KuduValue pointers that make up the IN
+    // list predicate. For every value in the vals_ list a call to
+    // CheckTypeAndGetPointer is made to get a local pointer (void*) to the
+    // underlying value. The local list (vals_list) of all the void* pointers 
is
+    // passed to the ColumnPredicate::InList constructor. The constructor for
+    // ColumnPredicate::InList will assume ownership of the pointers via a 
swap.
+    RETURN_NOT_OK(value->data_->CheckTypeAndGetPointer(col_.name(),
+                                                       
col_.type_info()->physical_type(),
+                                                       &val_void));
+    vals_list.push_back(val_void);
+  }
+
+  spec->AddPredicate(ColumnPredicate::InList(col_, &vals_list));
+
+  return Status::OK();
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/scan_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.h b/src/kudu/client/scan_predicate.h
index 99736c5..bb97151 100644
--- a/src/kudu/client/scan_predicate.h
+++ b/src/kudu/client/scan_predicate.h
@@ -57,6 +57,7 @@ class KUDU_EXPORT KuduPredicate {
  private:
   friend class ComparisonPredicateData;
   friend class ErrorPredicateData;
+  friend class InListPredicateData;
   friend class KuduTable;
   friend class ScanConfiguration;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/client/value.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/value.h b/src/kudu/client/value.h
index 03a38f5..2e3fb73 100644
--- a/src/kudu/client/value.h
+++ b/src/kudu/client/value.h
@@ -60,6 +60,7 @@ class KUDU_EXPORT KuduValue {
   ~KuduValue();
  private:
   friend class ComparisonPredicateData;
+  friend class InListPredicateData;
   friend class KuduColumnSpec;
 
   class KUDU_NO_EXPORT Data;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/column_predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate-test.cc 
b/src/kudu/common/column_predicate-test.cc
index 77450f0..ba691f2 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -39,6 +39,8 @@ class TestColumnPredicate : public KuduTest {
     ColumnPredicate a_base(a);
     ColumnPredicate b_base(b);
 
+    SCOPED_TRACE(strings::Substitute("a: $0, b: $1", a.ToString(), 
b.ToString()));
+
     a_base.Merge(b);
     b_base.Merge(a);
 
@@ -277,6 +279,260 @@ class TestColumnPredicate : public KuduTest {
               ColumnPredicate::None(column),
               PredicateType::None);
 
+    // InList + InList
+
+    vector<const void*> top_list;
+    vector<const void*> bot_list;
+    vector<const void*> res_list;
+
+    //   | | |  AND
+    //   | | |
+    // = | | |
+    top_list = { &values[1], &values[3], &values[5] };
+    bot_list = { &values[1], &values[3], &values[5] };
+    res_list = { &values[1], &values[3], &values[5] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //   | | |  AND
+    //   | |
+    // = | |
+    top_list = { &values[1], &values[3], &values[6] };
+    bot_list = { &values[1], &values[3] };
+    res_list = { &values[1], &values[3] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //   | | |  AND
+    //     | |
+    // =   | |
+    top_list = { &values[1], &values[3], &values[6] };
+    bot_list = { &values[3], &values[6] };
+    res_list = { &values[6], &values[3] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //     | | |  AND
+    //   | | |
+    // =   | |
+    top_list = { &values[2], &values[3], &values[4] };
+    bot_list = { &values[1], &values[2], &values[3] };
+    res_list = { &values[2], &values[3] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //      | | |  AND
+    //   | | |
+    // = NONE
+    top_list = { &values[3], &values[5], &values[6] };
+    bot_list = { &values[0], &values[2], &values[4] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //       | | |  AND
+    //   | | |
+    // =     |
+    top_list = { &values[1], &values[2], &values[3] };
+    bot_list = { &values[3], &values[4], &values[5] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::Equality(column, &values[3]),
+              PredicateType::Equality);
+
+    //         | | |  AND
+    //   | | |
+    // = None
+    top_list = { &values[4], &values[5], &values[6] };
+    bot_list = { &values[1], &values[2], &values[3] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //     | | |  AND
+    //    |||||
+    // =   | |
+    top_list = { &values[1], &values[3], &values[5] };
+    bot_list = { &values[0], &values[1], &values[2], &values[3], &values[4] };
+    res_list = { &values[1], &values[3] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //    | | |  AND
+    //     | |
+    // =  none
+    top_list = { &values[1], &values[3], &values[5] };
+    bot_list = { &values[2], &values[4] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //    | | |  AND
+    //     |||
+    // =    |
+    top_list = { &values[1], &values[3], &values[5] };
+    bot_list = { &values[2], &values[3], &values[4] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::Equality(column, &values[3]),
+              PredicateType::Equality);
+
+    //   | |   AND
+    //    | |
+    // = none
+    top_list = { &values[1], &values[3] };
+    bot_list = { &values[2], &values[4] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //   | |   AND
+    //       | |
+    // = none
+    top_list = { &values[0], &values[2] };
+    bot_list = { &values[3], &values[5] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+
+    // InList + Equality
+
+    //   | | |  AND
+    // |
+    // = none
+    top_list = { &values[2], &values[3], &values[4] };
+    bot_list = { &values[1] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+
+    //   | | | AND
+    //   |
+    // = |
+    top_list = { &values[1], &values[3], &values[6] };
+    bot_list = { &values[1] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::Equality(column, &values[1]),
+              PredicateType::Equality);
+
+    //  | | | AND
+    //    |
+    // =  |
+    top_list = { &values[1], &values[3], &values[6] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::Equality(column, &values[3]),
+              ColumnPredicate::Equality(column, &values[3]),
+              PredicateType::Equality);
+
+    //  | | | AND
+    //     |
+    // = none
+    top_list = { &values[1], &values[3], &values[6] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::Equality(column, &values[4]),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //  | | |  AND
+    //         |
+    // =  none
+    top_list = { &values[1], &values[3], &values[5] };
+    TestMerge(ColumnPredicate::InList(column, &top_list),
+              ColumnPredicate::Equality(column, &values[6]),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+
+    // InList + Range
+
+    //     [---) AND
+    //   | | | | |
+    // =   | |
+    bot_list = { &values[0], &values[1], &values[2], &values[3], &values[4] };
+    res_list = { &values[1], &values[2] };
+    TestMerge(ColumnPredicate::Range(column, &values[1], &values[3]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    //    [------) AND
+    //  |        | |
+    // = None
+    bot_list = { &values[1], &values[4], &values[5] };
+    TestMerge(ColumnPredicate::Range(column, &values[2], &values[4]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //  [------) AND
+    //            | |
+    // =
+    // None
+    bot_list = { &values[5], &values[6] };
+    TestMerge(ColumnPredicate::Range(column, &values[1], &values[4]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //       [------) AND
+    //   | |
+    // =
+    // None
+    bot_list = { &values[0], &values[1] };
+    TestMerge(ColumnPredicate::Range(column, &values[3], &values[6]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //   [------) AND
+    //        | |
+    // =
+    // None
+    bot_list = { &values[0], &values[1] };
+    TestMerge(ColumnPredicate::Range(column, &values[3], &values[6]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    //      [-----------> AND
+    //    | |  |
+    // =    |  |
+    bot_list = { &values[2], &values[3], &values[4] };
+    res_list = { &values[3], &values[4] };
+    TestMerge(ColumnPredicate::Range(column, &values[3], nullptr),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
+
+    // <----) AND
+    //   |  |  |
+    // = |
+    bot_list = { &values[2], &values[3], &values[4] };
+    res_list = { &values[3], &values[4] };
+    TestMerge(ColumnPredicate::Range(column, nullptr, &values[3]),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::Equality(column, &values[2]),
+              PredicateType::Equality);
+
     // None
 
     // None AND
@@ -324,6 +580,16 @@ class TestColumnPredicate : public KuduTest {
               ColumnPredicate::None(column),
               PredicateType::None);
 
+    // None AND
+    // | | |
+    // =
+    // None
+    bot_list = { &values[2], &values[3], &values[4] };
+    TestMerge(ColumnPredicate::None(column),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
     // IS NOT NULL
 
     // IS NOT NULL AND
@@ -379,6 +645,17 @@ class TestColumnPredicate : public KuduTest {
               ColumnPredicate::Range(column, &values[2], nullptr),
               ColumnPredicate::Range(column, &values[2], nullptr),
               PredicateType::Range);
+
+    // IS NOT NULL AND
+    // | | |
+    // =
+    // | | |
+    bot_list = { &values[2], &values[3], &values[4] };
+    res_list = { &values[2], &values[3], &values[4] };
+    TestMerge(ColumnPredicate::IsNotNull(column),
+              ColumnPredicate::InList(column, &bot_list),
+              ColumnPredicate::InList(column, &res_list),
+              PredicateType::InList);
   }
 };
 
@@ -518,6 +795,64 @@ TEST_F(TestColumnPredicate, TestExclusive) {
   }
 }
 
+// Test the InList constructor.
+TEST_F(TestColumnPredicate, TestInList) {
+    vector<const void*> values;
+  {
+    ColumnSchema column("c", INT32);
+    int five = 5;
+    int six = 6;
+    int ten = 10;
+
+    values = {};
+    ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &five };
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &five, &six, &ten };
+    ASSERT_EQ(PredicateType::InList,
+              ColumnPredicate::InList(column, &values).predicate_type());
+  }
+  {
+    Slice a("a");
+    Slice b("b");
+    Slice c("c");
+    ColumnSchema column("c", STRING);
+
+    values = {};
+    ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &a };
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &a, &b, &c };
+    ASSERT_EQ(PredicateType::InList,
+              ColumnPredicate::InList(column, &values).predicate_type());
+  }
+  {
+    bool t = true;
+    bool f = false;
+    ColumnSchema column("c", BOOL);
+
+    values = {};
+    ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &t };
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::InList(column, &values).predicate_type());
+
+    values = { &t, &f, &t, &f };
+    ASSERT_EQ(PredicateType::IsNotNull,
+              ColumnPredicate::InList(column, &values).predicate_type());
+  }
+}
+
 // Test that column predicate comparison works correctly: ordered by predicate
 // type first, then size of the column type.
 TEST_F(TestColumnPredicate, TestSelectivity) {
@@ -559,5 +894,4 @@ TEST_F(TestColumnPredicate, TestSelectivity) {
                                   ColumnPredicate::Equality(column_s, &one_s)),
             0);
 }
-
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/column_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.cc 
b/src/kudu/common/column_predicate.cc
index 229dcd6..4bc270b 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/common/column_predicate.h"
 
+#include <algorithm>
 #include <utility>
 
 #include "kudu/common/key_util.h"
@@ -26,6 +27,7 @@
 #include "kudu/util/memory/arena.h"
 
 using std::move;
+using std::vector;
 
 namespace kudu {
 
@@ -39,6 +41,16 @@ ColumnPredicate::ColumnPredicate(PredicateType 
predicate_type,
       upper_(upper) {
 }
 
+ColumnPredicate::ColumnPredicate(PredicateType predicate_type,
+                                 ColumnSchema column,
+                                 vector<const void*>* values)
+    : predicate_type_(predicate_type),
+      column_(move(column)),
+      lower_(nullptr),
+      upper_(nullptr) {
+  values_.swap(*values);
+}
+
 ColumnPredicate ColumnPredicate::Equality(ColumnSchema column, const void* 
value) {
   CHECK(value != nullptr);
   return ColumnPredicate(PredicateType::Equality, move(column), value, 
nullptr);
@@ -53,6 +65,26 @@ ColumnPredicate ColumnPredicate::Range(ColumnSchema column,
   return pred;
 }
 
+ColumnPredicate ColumnPredicate::InList(ColumnSchema column,
+                                        vector<const void*>* values) {
+  CHECK(values != nullptr);
+
+  // Sort values and remove duplicates.
+  std::sort(values->begin(), values->end(),
+            [&] (const void* a, const void* b) {
+              return column.type_info()->Compare(a, b) < 0;
+            });
+  values->erase(std::unique(values->begin(), values->end(),
+                            [&] (const void* a, const void* b) {
+                              return column.type_info()->Compare(a, b) == 0;
+                            }),
+                values->end());
+
+  ColumnPredicate pred(PredicateType::InList, move(column), values);
+  pred.Simplify();
+  return pred;
+}
+
 boost::optional<ColumnPredicate> ColumnPredicate::InclusiveRange(ColumnSchema 
column,
                                                                  const void* 
lower,
                                                                  const void* 
upper,
@@ -122,6 +154,11 @@ void ColumnPredicate::SetToNone() {
 }
 
 void ColumnPredicate::Simplify() {
+  // TODO(dan): we are missing some simplification opportunities here:
+  //    * range predicates including the entire range of a bool/integer
+  //      (`my_int8 >= -127`) can be simplified to `IS NOT NULL`.
+  //    * `IN` predicates including all values of a bool/integer
+  //      (`my_bool IN (true, false)`) can be simplified to `IS NOT NULL`.
   switch (predicate_type_) {
     case PredicateType::None:
     case PredicateType::Equality:
@@ -139,6 +176,27 @@ void ColumnPredicate::Simplify() {
       }
       return;
     };
+    case PredicateType::InList: {
+      if (values_.empty()) {
+        // If the list is empty, no results can be returned.
+        SetToNone();
+      } else if (values_.size() == 1) {
+        // List has only one value, so convert to Equality
+        predicate_type_ = PredicateType::Equality;
+        lower_ = values_[0];
+        values_.clear();
+      } else if (column_.type_info()->type() == BOOL) {
+        // If this is a boolean IN list with both true and false in the list,
+        // then we can just convert it to IS NOT NULL. This same simplification
+        // could be done for other integer types, but it's probably not as
+        // common (and hard to test).
+        predicate_type_ = PredicateType::IsNotNull;
+        lower_ = nullptr;
+        upper_ = nullptr;
+        values_.clear();
+      }
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -165,6 +223,11 @@ void ColumnPredicate::Merge(const ColumnPredicate& other) {
       predicate_type_ = other.predicate_type_;
       lower_ = other.lower_;
       upper_ = other.upper_;
+      values_ = other.values_;
+      return;
+    };
+    case PredicateType::InList: {
+      MergeIntoInList(other);
       return;
     };
   }
@@ -210,6 +273,25 @@ void ColumnPredicate::MergeIntoRange(const 
ColumnPredicate& other) {
       return;
     };
     case PredicateType::IsNotNull: return;
+    case PredicateType::InList : {
+      // The InList predicate values are examined to check whether
+      // they lie in the range.
+      // The current predicate is then converted from a range predicate to
+      // an InList predicate (since it is more selective).
+      // The number of values in the InList will depend on how many
+      // values were within the range.
+      // The call to Simplify() will then convert the InList if appropriate:
+      // i.e an InList with zero entries gets converted to a NONE
+      //     and InList with 1 entry gets converted into an Equality.
+      values_ = other.values_;
+      values_.erase(std::remove_if(values_.begin(), values_.end(),
+                                   [this] (const void* v) {
+                                     return !CheckValueInRange(v);
+                                   }), values_.end());
+      predicate_type_ = PredicateType::InList;
+      Simplify();
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -223,8 +305,7 @@ void ColumnPredicate::MergeIntoEquality(const 
ColumnPredicate& other) {
       return;
     }
     case PredicateType::Range: {
-      if ((other.lower_ != nullptr && column_.type_info()->Compare(lower_, 
other.lower_) < 0) ||
-          (other.upper_ != nullptr && column_.type_info()->Compare(lower_, 
other.upper_) >= 0)) {
+      if (!other.CheckValueInRange(lower_)) {
         // This equality value does not fall in the other range.
         SetToNone();
       }
@@ -237,17 +318,124 @@ void ColumnPredicate::MergeIntoEquality(const 
ColumnPredicate& other) {
       return;
     };
     case PredicateType::IsNotNull: return;
+    case PredicateType::InList : {
+      // The equality value needs to be a member of the InList
+      if (!other.CheckValueInList(lower_)) {
+        SetToNone();
+      }
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
 
+void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) {
+  CHECK(predicate_type_ == PredicateType::InList);
+  DCHECK(values_.size() > 1);
+
+  switch (other.predicate_type()) {
+    case PredicateType::None : {
+      SetToNone();
+      return;
+    };
+    case PredicateType::Range: {
+      // Only values within the range should be retained.
+      auto search_by = [&] (const void* lhs, const void* rhs) {
+        return this->column_.type_info()->Compare(lhs, rhs) < 0;
+      };
+
+      // Remove all values greater than the range.
+      if (other.upper_ != nullptr) {
+        // lower_bound is used here instead of upper_bound, since the upper
+        // bound of the range is exclusive, and the in list is inclusive.
+        auto upper = std::lower_bound(values_.begin(), values_.end(), 
other.upper_, search_by);
+        values_.erase(upper, values_.end());
+      }
+
+      // Remove all values less than the range.
+      if (other.lower_ != nullptr) {
+        auto lower = std::lower_bound(values_.begin(), values_.end(), 
other.lower_, search_by);
+        values_.erase(values_.begin(), lower);
+      }
+
+      Simplify();
+      return;
+    }
+    case PredicateType::Equality: {
+      if (CheckValueInList(other.lower_)) {
+        // value falls in list so change to Equality predicate
+        predicate_type_ = PredicateType::Equality;
+        lower_ = other.lower_;
+        upper_ = nullptr;
+      } else {
+        SetToNone(); // Value does not fall in list
+      }
+      return;
+    }
+    case PredicateType::IsNotNull: return;
+    case PredicateType::InList: {
+      // Merge the 'other' IN list into this IN list. The basic idea is to loop
+      // through this predicate list, retaining only the values which are also
+      // contained in the other predicate list. We apply an optimization first:
+      // all values from this in list which fall outside the range of the other
+      // IN list are removed, and the remaining values in this IN list are only
+      // checked against values in the other list which are in this list's
+      // range. This doesn't reduce the worst-case complexity, but can really
+      // speed up the merge for big lists in certain cases. This optimization
+      // relies on the lists being sorted.
+      DCHECK(other.values_.size() > 1);
+
+      auto search_by = [&] (const void* lhs, const void* rhs) {
+        return this->column_.type_info()->Compare(lhs, rhs) < 0;
+      };
+
+      // Remove all values in this IN list which are greater than the largest
+      // value in the other list.
+      values_.erase(std::upper_bound(values_.begin(), values_.end(),
+                                     other.values_.back(), search_by),
+                    values_.end());
+
+      // Remove all values in this IN list which are less than the smallest
+      // value in the other list.
+      values_.erase(values_.begin(),
+                    std::lower_bound(values_.begin(), values_.end(),
+                                     other.values_.front(), search_by));
+
+      if (values_.empty()) {
+        SetToNone();
+        return;
+      }
+
+      // Find the sublist in the other IN list which overlaps with this IN 
list.
+      auto other_start = std::lower_bound(other.values_.begin(), 
other.values_.end(),
+                                          values_.front(), search_by);
+      auto other_end = std::upper_bound(other_start, other.values_.end(),
+                                        values_.back(), search_by);
+
+      // Returns true if the value is *not* present in the other list.
+      // Modifies other_start to point at the position of v in the other list.
+      auto other_absent = [&] (const void* v) {
+        other_start = std::lower_bound(other_start, other_end, v, search_by);
+        return this->column_.type_info()->Compare(v, *other_start) != 0;
+      };
+
+      // Iterate through the values_ list and remove elements that do not exist
+      // in the other list.
+      values_.erase(std::remove_if(values_.begin(), values_.end(), 
other_absent), values_.end());
+      Simplify();
+      return;
+    };
+    default: LOG(FATAL) << "unknown predicate type";
+  }
+}
+
 namespace {
 template <typename P>
 void ApplyPredicate(const ColumnBlock& block, SelectionVector* sel, P p) {
   if (block.is_nullable()) {
     for (size_t i = 0; i < block.nrows(); i++) {
       if (!sel->IsRowSelected(i)) continue;
-      const void *cell = block.nullable_cell_ptr(i);
+      const void* cell = block.nullable_cell_ptr(i);
       if (cell == nullptr || !p(cell)) {
         BitmapClear(sel->mutable_bitmap(), i);
       }
@@ -255,7 +443,7 @@ void ApplyPredicate(const ColumnBlock& block, 
SelectionVector* sel, P p) {
   } else {
     for (size_t i = 0; i < block.nrows(); i++) {
       if (!sel->IsRowSelected(i)) continue;
-      const void *cell = block.cell_ptr(i);
+      const void* cell = block.cell_ptr(i);
       if (!p(cell)) {
         BitmapClear(sel->mutable_bitmap(), i);
       }
@@ -301,7 +489,16 @@ void ColumnPredicate::EvaluateForPhysicalType(const 
ColumnBlock& block,
         }
       }
       return;
-    }
+    };
+    case PredicateType::InList: {
+      ApplyPredicate(block, sel, [this] (const void* cell) {
+        return std::binary_search(values_.begin(), values_.end(), cell,
+                                  [] (const void* lhs, const void* rhs) {
+                                    return 
DataTypeTraits<PhysicalType>::Compare(lhs, rhs) < 0;
+                                  });
+      });
+      return;
+    };
     default:
       LOG(FATAL) << "unknown predicate type";
   }
@@ -347,6 +544,22 @@ string ColumnPredicate::ToString() const {
     case PredicateType::IsNotNull: {
       return strings::Substitute("`$0` IS NOT NULL", column_.name());
     };
+    case PredicateType::InList: {
+      string ss = "`";
+      ss.append(column_.name());
+      ss.append("` IN (");
+      bool is_first = true;
+      for (auto* value : values_) {
+        if (is_first) {
+          is_first = false;
+        } else {
+          ss.append(", ");
+        }
+        ss.append(column_.Stringify(value));
+      }
+      ss.append(")");
+      return ss;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -355,18 +568,39 @@ bool ColumnPredicate::operator==(const ColumnPredicate& 
other) const {
   if (!column_.Equals(other.column_, false)) { return false; }
   if (predicate_type_ != other.predicate_type_) {
     return false;
-  } else if (predicate_type_ == PredicateType::Equality) {
-    return column_.type_info()->Compare(lower_, other.lower_) == 0;
-  } else if (predicate_type_ == PredicateType::Range) {
-    return (lower_ == other.lower_ ||
-            (lower_ != nullptr && other.lower_ != nullptr &&
-             column_.type_info()->Compare(lower_, other.lower_) == 0)) &&
-           (upper_ == other.upper_ ||
-            (upper_ != nullptr && other.upper_ != nullptr &&
-             column_.type_info()->Compare(upper_, other.upper_) == 0));
-  } else {
-    return true;
   }
+  switch (predicate_type_) {
+    case PredicateType::Equality: return column_.type_info()->Compare(lower_, 
other.lower_) == 0;
+    case PredicateType::Range: {
+      return (lower_ == other.lower_ ||
+              (lower_ != nullptr && other.lower_ != nullptr &&
+               column_.type_info()->Compare(lower_, other.lower_) == 0)) &&
+             (upper_ == other.upper_ ||
+              (upper_ != nullptr && other.upper_ != nullptr &&
+               column_.type_info()->Compare(upper_, other.upper_) == 0));
+    };
+    case PredicateType::InList: {
+      if (values_.size() != other.values_.size()) return false;
+      for (int i = 0; i < values_.size(); i++) {
+        if (column_.type_info()->Compare(values_[i], other.values_[i]) != 0) 
return false;
+      }
+    };
+    case PredicateType::None:
+    case PredicateType::IsNotNull: return true;
+  }
+}
+
+bool ColumnPredicate::CheckValueInRange(const void* value) const {
+  CHECK(predicate_type_ == PredicateType::Range);
+  return ((lower_ == nullptr || column_.type_info()->Compare(lower_, value) <= 
0) &&
+          (upper_ == nullptr || column_.type_info()->Compare(upper_, value) > 
0));
+}
+
+bool ColumnPredicate::CheckValueInList(const void* value) const {
+  return std::binary_search(values_.begin(), values_.end(), value,
+                            [this](const void* lhs, const void* rhs) {
+                              return this->column_.type_info()->Compare(lhs, 
rhs) < 0;
+                            });
 }
 
 namespace {
@@ -375,8 +609,9 @@ int SelectivityRank(const ColumnPredicate& predicate) {
   switch (predicate.predicate_type()) {
     case PredicateType::None: rank = 0; break;
     case PredicateType::Equality: rank = 1; break;
-    case PredicateType::Range: rank = 2; break;
-    case PredicateType::IsNotNull: rank = 3; break;
+    case PredicateType::InList: rank = 2; break;
+    case PredicateType::Range: rank = 3; break;
+    case PredicateType::IsNotNull: rank = 4; break;
     default: LOG(FATAL) << "unknown predicate type";
   }
   return rank * (kLargestTypeSize + 1) + 
predicate.column().type_info()->size();

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/column_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.h 
b/src/kudu/common/column_predicate.h
index 9a7a880..aa996a1 100644
--- a/src/kudu/common/column_predicate.h
+++ b/src/kudu/common/column_predicate.h
@@ -19,6 +19,7 @@
 
 #include <boost/optional.hpp>
 #include <string>
+#include <vector>
 
 #include "kudu/common/schema.h"
 
@@ -44,6 +45,10 @@ enum class PredicateType {
 
   // A predicate which evaluates to true if the value is not null.
   IsNotNull,
+
+  // A predicate which evaluates to true if the column value is present in
+  // a value list.
+  InList,
 };
 
 // A predicate which can be evaluated over a block of column values.
@@ -107,6 +112,12 @@ class ColumnPredicate {
   // Creates a new IS NOT NULL predicate for the column.
   static ColumnPredicate IsNotNull(ColumnSchema column);
 
+  // Create a new IN <LIST> predicate for the column.
+  //
+  // The values are not copied, and must outlive the returned predicate.
+  // The InList will be simplified into an Equality, Range or None if possible.
+  static ColumnPredicate InList(ColumnSchema column, std::vector<const void*>* 
values);
+
   // Returns the type of this predicate.
   PredicateType predicate_type() const {
     return predicate_type_;
@@ -160,7 +171,13 @@ class ColumnPredicate {
       };
       case PredicateType::IsNotNull: {
         return true;
-      }
+      };
+      case PredicateType::InList: {
+        return std::binary_search(values_.begin(), values_.end(), cell,
+                                  [] (const void* lhs, const void* rhs) {
+                                    return 
DataTypeTraits<PhysicalType>::Compare(lhs, rhs) < 0;
+                                  });
+      };
     }
     LOG(FATAL) << "unknown predicate type";
   }
@@ -189,16 +206,27 @@ class ColumnPredicate {
     return column_;
   }
 
+  // Returns the list of values if this is an in-list predicate.
+  // The values are guaranteed to be unique and in sorted order.
+  const std::vector<const void*>& raw_values() const {
+    return values_;
+  }
+
  private:
 
   friend class TestColumnPredicate;
 
-  // Creates a new column predicate.
+  // Creates a new range or equality column predicate.
   ColumnPredicate(PredicateType predicate_type,
                   ColumnSchema column,
                   const void* lower,
                   const void* upper);
 
+  // Creates a new InList column predicate.
+  ColumnPredicate(PredicateType predicate_type,
+                  ColumnSchema column,
+                  std::vector<const void*>* values);
+
   // Creates a new predicate which matches no values.
   static ColumnPredicate None(ColumnSchema column);
 
@@ -220,6 +248,17 @@ class ColumnPredicate {
   void EvaluateForPhysicalType(const ColumnBlock& block,
                                SelectionVector* sel) const;
 
+  // Merge another predicate into this InList predicate.
+  void MergeIntoInList(const ColumnPredicate& other);
+
+  // For a Range type predicate, this helper function checks
+  // whether a given value is in the range.
+  bool CheckValueInRange(const void* value) const;
+
+  // For an InList type predicate, this helper function checks
+  // whether a given value is in the list.
+  bool CheckValueInList(const void* value) const;
+
   // The type of this predicate.
   PredicateType predicate_type_;
 
@@ -232,6 +271,9 @@ class ColumnPredicate {
 
   // The exclusive upper bound value if this is a Range predicate.
   const void* upper_;
+
+  // The list of values to check column against if this is an InList predicate.
+  std::vector<const void*> values_;
 };
 
 // Compares predicates according to selectivity. Predicates that match fewer

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 2e4914f..8e0d4aa 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -299,11 +299,18 @@ message ColumnPredicatePB {
     optional bytes value = 1;
   }
 
+  message InList {
+    // A list of values for the field. See comment in Range for notes on
+    // the encoding.
+    repeated bytes values = 1;
+  }
+
   message IsNotNull {}
 
   oneof predicate {
     Range range = 2;
     Equality equality = 3;
     IsNotNull is_not_null = 4;
+    InList in_list = 5;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
index 357744e..8ee6eb3 100644
--- a/src/kudu/common/key_util.cc
+++ b/src/kudu/common/key_util.cc
@@ -140,11 +140,11 @@ int PushUpperBoundKeyPredicates(ColIdxIter first,
 
   const Schema& schema = *CHECK_NOTNULL(row->schema());
   int pushed_predicates = 0;
-  // Tracks whether the last pushed predicate is an equality predicate.
+  // Tracks whether the last pushed predicate is an equality or InList 
predicate.
   const ColumnPredicate* final_predicate = nullptr;
 
   // Step 1: copy predicates into the row in key column order, stopping after
-  // the first range predicate.
+  // the first range or missing predicate.
 
   bool break_loop = false;
   for (auto col_idx_it = first; !break_loop && col_idx_it < last; 
std::advance(col_idx_it, 1)) {
@@ -173,6 +173,14 @@ int PushUpperBoundKeyPredicates(ColIdxIter first,
       case PredicateType::IsNotNull:
         break_loop = true;
         break;
+      case PredicateType::InList:
+        // Since the InList predicate is a sorted vector of values, the last
+        // value provides an inclusive upper bound that can be pushed.
+        DCHECK(!predicate->raw_values().empty());
+        memcpy(row->mutable_cell_ptr(*col_idx_it), 
predicate->raw_values().back(), size);
+        pushed_predicates++;
+        final_predicate = predicate;
+        break;
       case PredicateType::None:
         LOG(FATAL) << "NONE predicate can not be pushed into key";
     }
@@ -181,9 +189,10 @@ int PushUpperBoundKeyPredicates(ColIdxIter first,
   // If no predicates were pushed, no need to do any more work.
   if (pushed_predicates == 0) { return 0; }
 
-  // Step 2: If the final predicate is an equality predicate, increment the
-  // key to convert it to an exclusive upper bound.
-  if (final_predicate->predicate_type() == PredicateType::Equality) {
+  // Step 2: If the final predicate is an equality predicate or an InList 
predicate,
+  // increment the key to convert it to an exclusive upper bound.
+  if (final_predicate->predicate_type() == PredicateType::Equality
+      || final_predicate->predicate_type() == PredicateType::InList) {
     if (!IncrementKey(first, std::next(first, pushed_predicates), row, arena)) 
{
       // If the increment fails then this bound is is not constraining the 
keyspace.
       return 0;
@@ -228,6 +237,13 @@ int PushLowerBoundKeyPredicates(ColIdxIter first,
       case PredicateType::IsNotNull:
         break_loop = true;
         break;
+      case PredicateType::InList:
+        // Since the InList predicate is a sorted vector of values, the first
+        // value provides an inclusive lower bound that can be pushed.
+        DCHECK(!predicate->raw_values().empty());
+        memcpy(row->mutable_cell_ptr(*col_idx_it), 
predicate->raw_values().front(), size);
+        pushed_predicates++;
+        break;
       case PredicateType::None:
         LOG(FATAL) << "NONE predicate can not be pushed into key";
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/scan_spec-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/scan_spec-test.cc 
b/src/kudu/common/scan_spec-test.cc
index 1bfa404..79f6486 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -70,6 +70,21 @@ class TestScanSpec : public KuduTest {
     }
   }
 
+  template<class T>
+  void AddInPredicate(ScanSpec* spec, StringPiece col, const vector<T>& 
values) {
+    int idx = schema_.find_column(col);
+    CHECK(idx != Schema::kColumnNotFound);
+
+    vector<const void*> copied_values;
+    for (const auto& val : values) {
+      void* val_void = arena_.AllocateBytes(sizeof(val));
+      memcpy(val_void, &val, sizeof(val));
+      copied_values.push_back(val_void);
+    }
+
+    spec->AddPredicate(ColumnPredicate::InList(schema_.column(idx), 
&copied_values));
+  }
+
   // Set the lower bound of the spec to the provided row. The row must outlive
   // the spec.
   void SetLowerBound(ScanSpec* spec, const KuduPartialRow& row) {
@@ -320,6 +335,34 @@ TEST_F(CompositeIntKeysTest, TestIsNotNullPushdown) {
   EXPECT_EQ("`a` IS NOT NULL", spec.ToString(schema_));
 }
 
+// Test that IN list predicates get pushed into the primary key bounds.
+TEST_F(CompositeIntKeysTest, TestInListPushdown) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 10 });
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+  SCOPED_TRACE(spec.ToString(schema_));
+  spec.OptimizeScan(schema_, &arena_, &pool_, true);
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=10, int8 b=101, int8 c=-128) AND "
+            "`a` IN (0, 10) AND `b` IN (50, 100)",
+            spec.ToString(schema_));
+}
+
+// Test that IN list mixed with range predicates get pushed into the primary 
key
+// bounds.
+TEST_F(CompositeIntKeysTest, TestInListPushdownWithRange) {
+  ScanSpec spec;
+  AddPredicate<int8_t>(&spec, "a", GE, 10);
+  AddPredicate<int8_t>(&spec, "a", LE, 100);
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+  SCOPED_TRACE(spec.ToString(schema_));
+  spec.OptimizeScan(schema_, &arena_, &pool_, true);
+  EXPECT_EQ("PK >= (int8 a=10, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=101, int8 b=-128, int8 c=-128) AND "
+            "`b` IN (50, 100)",
+            spec.ToString(schema_));
+}
+
 // Tests that a scan spec without primary key bounds will not have predicates
 // after optimization.
 TEST_F(CompositeIntKeysTest, TestLiftPrimaryKeyBounds_NoBounds) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/scan_spec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index d8469e0..7beea46 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -150,12 +150,16 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const 
Schema& schema,
          col_idx < max(lower_bound_predicates_pushed, 
upper_bound_predicates_pushed);
          col_idx++) {
       const string& column = schema.column(col_idx).name();
-      PredicateType type = FindOrDie(predicates_, 
schema.column(col_idx).name()).predicate_type();
+      PredicateType type = FindOrDie(predicates_, column).predicate_type();
       if (type == PredicateType::Equality) {
         RemovePredicate(column);
       } else if (type == PredicateType::Range) {
         RemovePredicate(column);
         break;
+      } else if (type == PredicateType::InList) {
+        // InList predicates should not be removed as the full constraints 
imposed by an InList
+        // cannot be translated into only a single set of lower and upper 
bound primary keys
+        break;
       } else {
         LOG(FATAL) << "Can not remove unknown predicate type";
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/wire_protocol-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol-test.cc 
b/src/kudu/common/wire_protocol-test.cc
index 502b6e4..aeaa6b2 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <vector>
+
+#include <boost/optional.hpp>
 #include <gtest/gtest.h>
+#include "kudu/common/column_predicate.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
@@ -25,6 +29,8 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::vector;
+
 namespace kudu {
 
 class WireProtocolTest : public KuduTest {
@@ -330,4 +336,61 @@ TEST_F(WireProtocolTest, TestColumnDefaultValue) {
   ASSERT_EQ(write_default_u32, *static_cast<const uint32_t 
*>(col5fpb.write_default_value()));
 }
 
+TEST_F(WireProtocolTest, TestColumnPredicateInList) {
+  ColumnSchema col1("col1", INT32);
+  vector<ColumnSchema> cols = { col1 };
+  Schema schema(cols, 1);
+  Arena arena(1024,1024*1024);
+  boost::optional<ColumnPredicate> predicate;
+
+  { // col1 IN (5, 6, 10)
+    int five = 5;
+    int six = 6;
+    int ten = 10;
+    vector<const void*> values { &five, &six, &ten };
+
+    kudu::ColumnPredicate cp = kudu::ColumnPredicate::InList(col1, &values);
+    ColumnPredicatePB pb;
+    ASSERT_NO_FATAL_FAILURE(ColumnPredicateToPB(cp, &pb));
+
+    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InList);
+    ASSERT_EQ(3, predicate->raw_values().size());
+  }
+
+  { // col1 IN (0, 0)
+    // We can't construct a single element IN list directly since it would be
+    // simplified to an equality predicate, so we hack around it by directly
+    // constructing it as a protobuf message.
+    ColumnPredicatePB pb;
+    pb.set_column("col1");
+    *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
+    *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
+
+    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_EQ(PredicateType::Equality, predicate->predicate_type());
+  }
+
+  { // col1 IN ()
+    ColumnPredicatePB pb;
+    pb.set_column("col1");
+    pb.mutable_in_list();
+
+    Arena arena(1024,1024*1024);
+    boost::optional<ColumnPredicate> predicate;
+    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_EQ(PredicateType::None, predicate->predicate_type());
+  }
+
+  { // IN list corruption
+    ColumnPredicatePB pb;
+    pb.set_column("col1");
+    pb.mutable_in_list();
+    *pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1);
+
+    Arena arena(1024,1024*1024);
+    boost::optional<ColumnPredicate> predicate;
+    ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, 
&predicate).IsInvalidArgument());
+  }
+}
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9b50bd58/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 8815a2c..6b09956 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -389,6 +389,13 @@ void ColumnPredicateToPB(const ColumnPredicate& predicate,
       pb->mutable_is_not_null();
       return;
     };
+    case PredicateType::InList: {
+      auto* values = pb->mutable_in_list()->mutable_values();
+      for (const void* value : predicate.raw_values()) {
+        CopyPredicateBoundToPB(predicate.column(), value, values->Add());
+      }
+      return;
+    };
     case PredicateType::None: LOG(FATAL) << "None predicate may not be 
converted to protobuf";
   }
   LOG(FATAL) << "unknown predicate type";
@@ -439,6 +446,17 @@ Status ColumnPredicateFromPB(const Schema& schema,
       *predicate = ColumnPredicate::Equality(col, value);
       break;
     };
+    case ColumnPredicatePB::kInList: {
+      const auto& inlist = pb.in_list();
+      vector<const void*> values;
+      for (const string& pb_value : inlist.values()) {
+        const void* value = nullptr;
+        RETURN_NOT_OK(CopyPredicateBoundFromPB(col, pb_value, arena, &value));
+        values.push_back(value);
+      }
+      *predicate = ColumnPredicate::InList(col, &values);
+      break;
+    };
     case ColumnPredicatePB::kIsNotNull: {
       ColumnPredicate p = ColumnPredicate::IsNotNull(col);
       *predicate = ColumnPredicate::IsNotNull(col);

Reply via email to