IMPALA-5286/IMPALA-5283: Kudu column name case cleanup

Impala is case insensitive for column names and generally deals
with them in all lower case. Kudu is case sensitive. This can
lead to a problems when a table is created externally in Kudu
with a column name with upper case letters.

This patch solves the problem by having KuduColumn always store
its name in lower case, so that general Impala code that has been
written expecting lower cased column names can use Column.getName()
safely.

It also adds the method KuduColumn.getKuduName(), which returns
the column name in the case that it appears in Kudu. Any code that
passes column names into the Kudu API must call this method first
to get the correct column name.

There are four specific situations fixed by this patch:
- When ordering on a Kudu column, the Analyzer would create
  two SlotDescriptors that point to the same column because
  registerSlotRef() was being called with inconsistent casing.
  It is now always called with the lower cased names.
- 'ADD RANGE PARTITION' would fail to find the range partition
  column if it isn't all lower case in Kudu.
- 'ALTER TABLE DROP COLUMN' and 'ALTER TABLE CHANGE' only worked
  if the column name was specified in Kudu case.
- 'CREATE EXTERNAL TABLE' called on a Kudu table with column names
  that differ only in case now returns an error, since Impala has
  no way of handling this situation.

Testing:
- Added e2e tests in test_kudu.py.
- Manually edited functional_kudu to change column names to have
  mixed casing and ran the kudu tests.

Change-Id: I14aba88510012174716691b9946e1c7d54d01b44
Reviewed-on: http://gerrit.cloudera.org:8080/6902
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7f381798
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7f381798
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7f381798

Branch: refs/heads/master
Commit: 7f3817982ff7968193e77abff051a52fc6e0d8cf
Parents: d6abb29
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Fri May 12 12:23:42 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Sat Jun 17 01:16:00 2017 +0000

----------------------------------------------------------------------
 common/thrift/CatalogObjects.thrift             |  3 +
 .../org/apache/impala/analysis/Analyzer.java    |  2 +
 .../org/apache/impala/analysis/ColumnDef.java   |  2 +-
 .../java/org/apache/impala/catalog/Column.java  |  1 +
 .../org/apache/impala/catalog/KuduColumn.java   | 10 ++-
 .../org/apache/impala/planner/KuduScanNode.java | 11 +--
 .../impala/service/KuduCatalogOpExecutor.java   | 23 +++++-
 .../java/org/apache/impala/util/KuduUtil.java   |  7 +-
 tests/query_test/test_kudu.py                   | 77 ++++++++++++++++++++
 9 files changed, 122 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 51ab922..45f103c 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -190,6 +190,7 @@ struct TPartitionStats {
 }
 
 struct TColumn {
+  // The column name, in lower case.
   1: required string columnName
   2: required Types.TColumnType columnType
   3: optional string comment
@@ -213,6 +214,8 @@ struct TColumn {
   14: optional THdfsCompression compression
   15: optional Exprs.TExpr default_value
   16: optional i32 block_size
+  // The column name, in the case that it appears in Kudu.
+  17: optional string kudu_column_name
 }
 
 // Represents an HDFS file in a partition.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 7fd6cf7..a3e16c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -955,6 +955,8 @@ public class Analyzer {
     // SlotRefs with a scalar type are registered against the slot's
     // fully-qualified lowercase path.
     String key = slotPath.toString();
+    Preconditions.checkState(key.equals(key.toLowerCase()),
+        "Slot paths should be lower case: " + key);
     SlotDescriptor existingSlotDesc = slotPathMap_.get(key);
     if (existingSlotDesc != null) return existingSlotDesc;
     SlotDescriptor result = addSlotDescriptor(slotPath.getRootDesc());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java 
b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
index 844d70f..fa20d71 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
@@ -353,7 +353,7 @@ public class ColumnDef {
     Integer blockSize =
         blockSize_ == null ? null : (int) ((NumericLiteral) 
blockSize_).getIntValue();
     KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_,
-        compression_, outputDefaultValue_, blockSize);
+        compression_, outputDefaultValue_, blockSize, colName_);
     if (comment_ != null) col.setComment(comment_);
     return col;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/catalog/Column.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Column.java 
b/fe/src/main/java/org/apache/impala/catalog/Column.java
index 6e67845..0f4086c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Column.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Column.java
@@ -52,6 +52,7 @@ public class Column {
   }
 
   public Column(String name, Type type, String comment, int position) {
+    Preconditions.checkState(name.equals(name.toLowerCase()));
     name_ = name;
     type_ = type;
     comment_ = comment;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index dda63c1..75699ab 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -40,6 +40,8 @@ import org.apache.kudu.ColumnSchema;
  *  - desired block size
  */
 public class KuduColumn extends Column {
+  // The name of the column as it appears in Kudu, i.e. not converted to lower 
case.
+  private final String kuduName_;
   private final boolean isKey_;
   private final boolean isNullable_;
   private final Encoding encoding_;
@@ -56,9 +58,10 @@ public class KuduColumn extends Column {
   private KuduColumn(String name, Type type, boolean isKey, boolean isNullable,
       Encoding encoding, CompressionAlgorithm compression, LiteralExpr 
defaultValue,
       int blockSize, String comment, int position) {
-    super(name, type, comment, position);
+    super(name.toLowerCase(), type, comment, position);
     Preconditions.checkArgument(defaultValue == null || type == 
defaultValue.getType()
         || (type.isTimestamp() && defaultValue.getType().isIntegerType()));
+    kuduName_ = name;
     isKey_ = isKey;
     isNullable_ = isNullable;
     encoding_ = encoding;
@@ -108,11 +111,12 @@ public class KuduColumn extends Column {
     }
     int blockSize = 0;
     if (column.isSetBlock_size()) blockSize = column.getBlock_size();
-    return new KuduColumn(column.getColumnName(), columnType, 
column.isIs_key(),
+    return new KuduColumn(column.getKudu_column_name(), columnType, 
column.isIs_key(),
         column.isIs_nullable(), encoding, compression, defaultValue, 
blockSize, null,
         position);
   }
 
+  public String getKuduName() { return kuduName_; }
   public boolean isKey() { return isKey_; }
   public boolean isNullable() { return isNullable_; }
   public Encoding getEncoding() { return encoding_; }
@@ -147,7 +151,7 @@ public class KuduColumn extends Column {
   public TColumn toThrift() {
     TColumn colDesc = new TColumn(name_, type_.toThrift());
     KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, 
compression_,
-        defaultValue_, blockSize_);
+        defaultValue_, blockSize_, kuduName_);
     if (comment_ != null) colDesc.setComment(comment_);
     colDesc.setCol_stats(getStats().toThrift());
     colDesc.setPosition(position_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index c03307e..7c6c5e3 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -149,7 +150,7 @@ public class KuduScanNode extends ScanNode {
       throws ImpalaRuntimeException {
     Schema tableSchema = rpcTable.getSchema();
     for (SlotDescriptor desc: getTupleDesc().getSlots()) {
-      String colName = desc.getColumn().getName();
+      String colName = ((KuduColumn) desc.getColumn()).getKuduName();
       Type colType = desc.getColumn().getType();
       ColumnSchema kuduCol = null;
       try {
@@ -235,7 +236,7 @@ public class KuduScanNode extends ScanNode {
       org.apache.kudu.client.KuduTable rpcTable) {
     List<String> projectedCols = Lists.newArrayList();
     for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
-      projectedCols.add(desc.getColumn().getName());
+      projectedCols.add(((KuduColumn) desc.getColumn()).getKuduName());
     }
     KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
     tokenBuilder.setProjectedColumnNames(projectedCols);
@@ -350,7 +351,7 @@ public class KuduScanNode extends ScanNode {
     // Cannot push predicates with null literal values (KUDU-1595).
     if (literal instanceof NullLiteral) return false;
 
-    String colName = ref.getDesc().getColumn().getName();
+    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
     ColumnSchema column = table.getSchema().getColumn(colName);
     KuduPredicate kuduPredicate = null;
     switch (literal.getType().getPrimitiveType()) {
@@ -438,7 +439,7 @@ public class KuduScanNode extends ScanNode {
       values.add(value);
     }
 
-    String colName = ref.getDesc().getColumn().getName();
+    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
     ColumnSchema column = table.getSchema().getColumn(colName);
     kuduPredicates_.add(KuduPredicate.newInListPredicate(column, values));
     kuduConjuncts_.add(predicate);
@@ -461,7 +462,7 @@ public class KuduScanNode extends ScanNode {
     if (!(predicate.getChild(0) instanceof SlotRef)) return false;
     SlotRef ref = (SlotRef) predicate.getChild(0);
 
-    String colName = ref.getDesc().getColumn().getName();
+    String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
     ColumnSchema column = table.getSchema().getColumn(colName);
     KuduPredicate kuduPredicate = null;
     if (predicate.isNotNull()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index ed3b5af..44f1378 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableNotFoundException;
@@ -52,6 +53,7 @@ import org.apache.log4j.Logger;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * This is a helper for the CatalogOpExecutor to provide Kudu related DDL 
functionality
@@ -251,7 +253,13 @@ public class KuduCatalogOpExecutor {
       // Replace the columns in the Metastore table with the columns from the 
recently
       // accessed Kudu schema.
       cols.clear();
+      Set<String> lowerCaseColNames = Sets.newHashSet();
       for (ColumnSchema colSchema : kuduTable.getSchema().getColumns()) {
+        if (!lowerCaseColNames.add(colSchema.getName().toLowerCase())) {
+          throw new ImpalaRuntimeException(String.format(
+              "Error loading Kudu table: Impala does not support column names 
that " +
+              "differ only in casing '%s'", colSchema.getName()));
+        }
         Type type = KuduUtil.toImpalaType(colSchema.getType());
         cols.add(new FieldSchema(colSchema.getName(), 
type.toSql().toLowerCase(), null));
       }
@@ -339,13 +347,20 @@ public class KuduCatalogOpExecutor {
 
   private static List<Pair<PartialRow, RangePartitionBound>> 
getRangePartitionBounds(
       TRangePartition rangePartition, KuduTable tbl) throws 
ImpalaRuntimeException {
+    List<String> rangePartitioningColNames = 
tbl.getRangePartitioningColNames();
+    List<String> rangePartitioningKuduColNames =
+      Lists.newArrayListWithCapacity(rangePartitioningColNames.size());
+    for (String colName : rangePartitioningColNames) {
+      
rangePartitioningKuduColNames.add(((KuduColumn)tbl.getColumn(colName)).getKuduName());
+    }
     return getRangePartitionBounds(rangePartition, tbl.getKuduSchema(),
-        tbl.getRangePartitioningColNames());
+        rangePartitioningKuduColNames);
   }
 
   /**
    * Returns the bounds of a range partition in two <PartialRow, 
RangePartitionBound>
    * pairs to be used in Kudu API calls for ALTER and CREATE TABLE statements.
+   * 'rangePartitioningColNames' must be specified in Kudu case.
    */
   private static List<Pair<PartialRow, RangePartitionBound>> 
getRangePartitionBounds(
       TRangePartition rangePartition, Schema schema,
@@ -388,8 +403,9 @@ public class KuduCatalogOpExecutor {
   public static void dropColumn(KuduTable tbl, String colName)
       throws ImpalaRuntimeException {
     Preconditions.checkState(!Strings.isNullOrEmpty(colName));
+    KuduColumn col = (KuduColumn) tbl.getColumn(colName);
     AlterTableOptions alterTableOptions = new AlterTableOptions();
-    alterTableOptions.dropColumn(colName);
+    alterTableOptions.dropColumn(col.getKuduName());
     String errMsg = String.format("Error dropping column %s from " +
         "Kudu table %s", colName, tbl.getName());
     alterKuduTable(tbl, alterTableOptions, errMsg);
@@ -402,8 +418,9 @@ public class KuduCatalogOpExecutor {
       throws ImpalaRuntimeException {
     Preconditions.checkState(!Strings.isNullOrEmpty(oldName));
     Preconditions.checkNotNull(newCol);
+    KuduColumn col = (KuduColumn) tbl.getColumn(oldName);
     AlterTableOptions alterTableOptions = new AlterTableOptions();
-    alterTableOptions.renameColumn(oldName, newCol.getColumnName());
+    alterTableOptions.renameColumn(col.getKuduName(), newCol.getColumnName());
     String errMsg = String.format("Error renaming column %s to %s " +
         "for Kudu table %s", oldName, newCol.getColumnName(), tbl.getName());
     alterKuduTable(tbl, alterTableOptions, errMsg);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index c615d06..be98cf6 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -83,6 +83,7 @@ public class KuduUtil {
 
   /**
    * Creates a PartialRow from a list of range partition boundary values.
+   * 'rangePartitionColumns' must be specified in Kudu case.
    */
   private static PartialRow parseRangePartitionBoundaryValues(Schema schema,
       List<String> rangePartitionColumns, List<TExpr> boundaryValues)
@@ -104,7 +105,7 @@ public class KuduUtil {
    * table. The range-partition bound consists of a PartialRow with the 
boundary
    * values and a RangePartitionBound indicating if the bound is inclusive or 
exclusive.
    * Throws an ImpalaRuntimeException if an error occurs while parsing the 
boundary
-   * values.
+   * values. 'rangePartitionColumns' must be specified in Kudu case.
    */
   public static Pair<PartialRow, RangePartitionBound> buildRangePartitionBound(
       Schema schema, List<String> rangePartitionColumns, List<TExpr> 
boundaryValues,
@@ -312,7 +313,7 @@ public class KuduUtil {
 
   public static TColumn setColumnOptions(TColumn column, boolean isKey,
       Boolean isNullable, Encoding encoding, CompressionAlgorithm compression,
-      Expr defaultValue, Integer blockSize) {
+      Expr defaultValue, Integer blockSize, String kuduName) {
     column.setIs_key(isKey);
     if (isNullable != null) column.setIs_nullable(isNullable);
     try {
@@ -330,6 +331,8 @@ public class KuduUtil {
       column.setDefault_value(defaultValue.treeToThrift());
     }
     if (blockSize != null) column.setBlock_size(blockSize);
+    Preconditions.checkNotNull(kuduName);
+    column.setKudu_column_name(kuduName);
     return column;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f381798/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 8f98e3e..ed9c37d 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -555,6 +555,83 @@ class TestCreateExternalTable(KuduTestSuite):
       if kudu_client.table_exists(name):
         kudu_client.delete_table(name)
 
+  def test_column_name_case(self, cursor, kudu_client, unique_database):
+    """IMPALA-5286: Tests that an external Kudu table that was created with a 
column name
+       containing upper case letters is handled correctly."""
+    table_name = '%s.kudu_external_test' % unique_database
+    if kudu_client.table_exists(table_name):
+      kudu_client.delete_table(table_name)
+
+    schema_builder = SchemaBuilder()
+    key_col = 'Key'
+    schema_builder.add_column(key_col, INT64).nullable(False).primary_key()
+    schema = schema_builder.build()
+    partitioning = Partitioning().set_range_partition_columns([key_col])\
+        .add_range_partition([1], [10])
+
+    try:
+      kudu_client.create_table(table_name, schema, partitioning)
+
+      props = "tblproperties('kudu.table_name' = '%s')" % table_name
+      cursor.execute("create external table %s stored as kudu %s" % 
(table_name, props))
+
+      # Perform a variety of operations on the table.
+      cursor.execute("insert into %s (kEy) values (5), (1), (4)" % table_name)
+      cursor.execute("select keY from %s where KeY %% 2 = 0" % table_name)
+      assert cursor.fetchall() == [(4, )]
+      cursor.execute("select * from %s order by kEY" % (table_name))
+      assert cursor.fetchall() == [(1, ), (4, ), (5, )]
+      cursor.execute("alter table %s add range partition 11 < values < 20" % 
table_name)
+
+      new_key = "KEY2"
+      cursor.execute("alter table %s change KEy %s bigint" % (table_name, 
new_key))
+      val_col = "vaL"
+      cursor.execute("alter table %s add columns (%s bigint)" % (table_name, 
val_col))
+
+      cursor.execute("describe %s" % table_name)
+      results = cursor.fetchall()
+      # 'describe' should print the column name in lower case.
+      assert new_key.lower() in results[0]
+      assert val_col.lower() in results[1]
+
+      cursor.execute("alter table %s drop column Val" % table_name);
+      cursor.execute("describe %s" % table_name)
+      assert len(cursor.fetchall()) == 1
+
+      cursor.execute("alter table %s drop range partition 11 < values < 20" % 
table_name)
+    finally:
+      if kudu_client.table_exists(table_name):
+        kudu_client.delete_table(table_name)
+
+  def test_conflicting_column_name(self, cursor, kudu_client, unique_database):
+    """IMPALA-5283: Tests that loading an external Kudu table that was created 
with column
+       names that differ only in case results in an error."""
+    table_name = '%s.kudu_external_test' % unique_database
+    if kudu_client.table_exists(table_name):
+      kudu_client.delete_table(table_name)
+
+    schema_builder = SchemaBuilder()
+    col0 = 'col'
+    schema_builder.add_column(col0, INT64).nullable(False).primary_key()
+    col1 = 'COL'
+    schema_builder.add_column(col1, INT64)
+    schema = schema_builder.build()
+    partitioning = Partitioning().set_range_partition_columns([col0])\
+        .add_range_partition([1], [10])
+
+    try:
+      kudu_client.create_table(table_name, schema, partitioning)
+
+      props = "tblproperties('kudu.table_name' = '%s')" % table_name
+      cursor.execute("create external table %s stored as kudu %s" % 
(table_name, props))
+      assert False, 'create table should have resulted in an exception'
+    except Exception as e:
+      assert 'Error loading Kudu table: Impala does not support column names 
that ' \
+          + 'differ only in casing' in str(e)
+    finally:
+      if kudu_client.table_exists(table_name):
+        kudu_client.delete_table(table_name)
+
 class TestShowCreateTable(KuduTestSuite):
 
   def assert_show_create_equals(self, cursor, create_sql, show_create_sql):

Reply via email to