This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit e8ccca82be80611e79178c1ab1acc75c5d62b2e0
Author: Kurt Deschler <[email protected]>
AuthorDate: Tue Sep 19 11:48:10 2023 -0500

    HIVE-27874: Support datatype conversion on fetch threads (Kurt Deschler, 
reviewed by Attila Turoczy, Denys Kuzmenko)
    
    This patch provides a mechanism to move expensive datatype conversions
    (i.e.  Timestamp) to the fetch threads where the work can be done in
    parallel. This can substantially improve performance in cases where the
    client thread is the bottleneck and resources are available for multiple
    fetch threads. Implementation is in form of ConvertedResultSet, which is
    agnostic to the underlying protocol result and can be dynamically
    substituted into the fetch path.
    
    Closes #4902
---
 .../org/apache/hive/jdbc/ConvertedResultSet.java   | 135 ++++++++
 .../org/apache/hive/jdbc/HiveBaseResultSet.java    |   9 +-
 .../org/apache/hive/jdbc/HiveQueryResultSet.java   |  13 +
 jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java | 346 ++++++++++-----------
 4 files changed, 329 insertions(+), 174 deletions(-)

diff --git a/jdbc/src/java/org/apache/hive/jdbc/ConvertedResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/ConvertedResultSet.java
new file mode 100644
index 00000000000..2edeafd44c1
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/ConvertedResultSet.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.function.UnaryOperator;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
+
+import org.apache.hadoop.hive.serde2.thrift.Type;
+
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
+
+/**
+ * ConvertedResultSet
+ */
+public class ConvertedResultSet implements RowSet {
+
+  private final List<Object[]> rows;
+  private final int numColumns;
+
+  static final Map<Type, UnaryOperator<Object>> convertFuncs = new 
EnumMap<Type, UnaryOperator<Object>>(Type.class) {{
+    put(Type.BINARY_TYPE, o -> (o instanceof String) ? ((String) o).getBytes() 
: 0);
+    put(Type.TIMESTAMP_TYPE, o -> Timestamp.valueOf((String) o));
+    put(Type.TIMESTAMPLOCALTZ_TYPE, o -> TimestampTZUtil.parse((String) o));
+    put(Type.DECIMAL_TYPE, o -> new BigDecimal((String) o));
+    put(Type.DATE_TYPE, o -> Date.valueOf((String) o));
+    put(Type.INTERVAL_YEAR_MONTH_TYPE, o -> 
HiveIntervalYearMonth.valueOf((String) o));
+    put(Type.INTERVAL_DAY_TIME_TYPE, o -> HiveIntervalDayTime.valueOf((String) 
o));
+  }};
+
+
+  public ConvertedResultSet(RowSet rowSet, TableSchema schema) {
+    rows = new ArrayList<>();
+    numColumns = rowSet.numColumns();
+    Iterator<Object[]> srcIter = rowSet.iterator();
+    final int nCols = schema.getSize();
+    
+    UnaryOperator[] colConvert = new UnaryOperator[nCols];
+    for (int i = 0; i < nCols; ++i) {
+      colConvert[i] = 
convertFuncs.get(schema.getColumnDescriptorAt(i).getType());
+    }
+    while (srcIter.hasNext()) {
+      Object[] srcRow = srcIter.next();
+      Object[] dstRow = new Object[srcRow.length];
+      for (int i = 0; i < nCols; ++i) {
+        if (colConvert[i] != null && srcRow[i] != null) {
+          dstRow[i] = colConvert[i].apply(srcRow[i]);
+        } else {
+          dstRow[i] = srcRow[i];
+        }
+      }
+      rows.add(dstRow);
+    }
+  }
+
+  @Override
+  public ConvertedResultSet addRow(Object[] fields) {
+    throw new UnsupportedOperationException("addRow");
+  }
+
+  @Override
+  public int numColumns() {
+    return numColumns;
+  }
+
+  @Override
+  public int numRows() {
+    return rows.size();
+  }
+
+  public ConvertedResultSet extractSubset(int maxRows) {
+    throw new UnsupportedOperationException("extractSubset");
+  }
+
+  @Override
+  public long getStartOffset() {
+    throw new UnsupportedOperationException("getStartOffset");
+  }
+
+  @Override
+  public void setStartOffset(long startOffset) {
+    throw new UnsupportedOperationException("setStartOffset");
+  }
+
+  @Override
+  public TRowSet toTRowSet() {
+    throw new UnsupportedOperationException("toTRowSet");
+  }
+
+  @Override
+  public Iterator<Object[]> iterator() {
+    return new Iterator<Object[]>() {
+
+      final Iterator<Object[]> iterator = rows.iterator();
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public Object[] next() {
+        return iterator.next();
+      }
+    };
+  }
+}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
index 3988b02fd32..f92df88bdce 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
@@ -63,6 +63,7 @@ public abstract class HiveBaseResultSet implements ResultSet {
   protected Statement statement = null;
   protected SQLWarning warningChain = null;
   protected boolean wasNull = false;
+  protected boolean useConvertedResultSet = false;
   protected Object[] row;
   protected List<String> columnNames;
   protected List<String> normalizedColumnNames;
@@ -523,8 +524,14 @@ public abstract class HiveBaseResultSet implements 
ResultSet {
       throw new SQLException("Invalid columnIndex: " + columnIndex);
     }
 
-    final Type columnType = getSchema().getColumnDescriptorAt(columnIndex - 
1).getType();
     final Object value = row[columnIndex - 1];
+    if (useConvertedResultSet) {
+      // Conversion has been done already so just return the object
+      wasNull = value == null;
+      return value;
+    }
+
+    final Type columnType = getSchema().getColumnDescriptorAt(columnIndex - 
1).getType();
 
     try {
       final Object evaluated = evaluate(columnType, value);
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index 874dfb55301..9cbddf01d8c 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -231,6 +231,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     this.protocol = builder.getProtocolVersion();
     initEmptyIterator();
     resultQueue = new ArrayBlockingQueue<>(Math.max(fetchThreads, 1));
+    // The fetch path is usually the bottleneck with a single-threaded fetch
+    // so only convert results there if there are multiple fetch threads.
+    if (fetchThreads > 1) {
+      useConvertedResultSet = true;
+    }
   }
 
   /**
@@ -404,6 +409,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
           gotLastBatch.set(true);
         }
         fetchedRows = RowSetFactory.create(results, protocol);
+        if (useConvertedResultSet) {
+          fetchedRows = new ConvertedResultSet(fetchedRows, getSchema());
+        }
+
         nextStartRow.set(results.getStartRowOffset() + 1 + 
fetchedRows.numRows());
       } catch (TException ex) {
         throw new SQLException("Error retrieving next row", ex);
@@ -466,6 +475,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
                 }
                 result.hasMoreRows = hasMoreRows;
                 result.fetchedRows = RowSetFactory.create(results, protocol);
+                if (useConvertedResultSet) {
+                  result.fetchedRows =
+                      new ConvertedResultSet(result.fetchedRows, getSchema());
+                }
                 result.startRow = results.getStartRowOffset() + 1;
                 if (hasStartRow.get() && result.startRow < nextStartRow.get()) 
{
                   throw new SQLException("Unexpected row offset");
diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java 
b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
index 8a9c5790025..d3b7098fa7f 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
@@ -186,52 +186,52 @@ public class JdbcColumn {
     // TODO: this would be better handled in an enum
     final String t = type.toLowerCase();
     switch (t) {
-    case "string":
-      return serdeConstants.STRING_TYPE_NAME;
-    case "varchar":
-      return serdeConstants.VARCHAR_TYPE_NAME;
-    case "char":
-      return serdeConstants.CHAR_TYPE_NAME;
-    case "float":
-      return serdeConstants.FLOAT_TYPE_NAME;
-    case "double":
-      return serdeConstants.DOUBLE_TYPE_NAME;
-    case "boolean":
-      return serdeConstants.BOOLEAN_TYPE_NAME;
-    case "tinyint":
-      return serdeConstants.TINYINT_TYPE_NAME;
-    case "smallint":
-      return serdeConstants.SMALLINT_TYPE_NAME;
-    case "int":
-      return serdeConstants.INT_TYPE_NAME;
-    case "bigint":
-      return serdeConstants.BIGINT_TYPE_NAME;
-    case "timestamp":
-      return serdeConstants.TIMESTAMP_TYPE_NAME;
-    case serdeConstants.TIMESTAMPLOCALTZ_TYPE_NAME:
-      return serdeConstants.TIMESTAMPLOCALTZ_TYPE_NAME;
-    case "date":
-      return serdeConstants.DATE_TYPE_NAME;
-    case "interval_year_month":
-      return serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME;
-    case "interval_day_time":
-      return serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME;
-    case "decimal":
-      return serdeConstants.DECIMAL_TYPE_NAME;
-    case "binary":
-      return serdeConstants.BINARY_TYPE_NAME;
-    case "void":
-      /* fall through */
-    case "null":
-      return serdeConstants.VOID_TYPE_NAME;
-    case "map":
-      return serdeConstants.MAP_TYPE_NAME;
-    case "array":
-      return serdeConstants.LIST_TYPE_NAME;
-    case "struct":
-      return serdeConstants.STRUCT_TYPE_NAME;
-    default:
-      throw new SQLException("Unrecognized column type: " + type);
+      case "string":
+        return serdeConstants.STRING_TYPE_NAME;
+      case "varchar":
+        return serdeConstants.VARCHAR_TYPE_NAME;
+      case "char":
+        return serdeConstants.CHAR_TYPE_NAME;
+      case "float":
+        return serdeConstants.FLOAT_TYPE_NAME;
+      case "double":
+        return serdeConstants.DOUBLE_TYPE_NAME;
+      case "boolean":
+        return serdeConstants.BOOLEAN_TYPE_NAME;
+      case "tinyint":
+        return serdeConstants.TINYINT_TYPE_NAME;
+      case "smallint":
+        return serdeConstants.SMALLINT_TYPE_NAME;
+      case "int":
+        return serdeConstants.INT_TYPE_NAME;
+      case "bigint":
+        return serdeConstants.BIGINT_TYPE_NAME;
+      case "timestamp":
+        return serdeConstants.TIMESTAMP_TYPE_NAME;
+      case serdeConstants.TIMESTAMPLOCALTZ_TYPE_NAME:
+        return serdeConstants.TIMESTAMPLOCALTZ_TYPE_NAME;
+      case "date":
+        return serdeConstants.DATE_TYPE_NAME;
+      case "interval_year_month":
+        return serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME;
+      case "interval_day_time":
+        return serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME;
+      case "decimal":
+        return serdeConstants.DECIMAL_TYPE_NAME;
+      case "binary":
+        return serdeConstants.BINARY_TYPE_NAME;
+      case "void":
+        /* fall through */
+      case "null":
+        return serdeConstants.VOID_TYPE_NAME;
+      case "map":
+        return serdeConstants.MAP_TYPE_NAME;
+      case "array":
+        return serdeConstants.LIST_TYPE_NAME;
+      case "struct":
+        return serdeConstants.STRUCT_TYPE_NAME;
+      default:
+        throw new SQLException("Unrecognized column type: " + type);
     }
   }
 
@@ -240,42 +240,42 @@ public class JdbcColumn {
     // according to hiveTypeToSqlType possible options are:
     int columnType = hiveTypeToSqlType(hiveType);
     switch(columnType) {
-    case Types.NULL:
-      return 4; // "NULL"
-    case Types.BOOLEAN:
-      return columnPrecision(hiveType, columnAttributes);
-    case Types.CHAR:
-    case Types.VARCHAR:
-      return columnPrecision(hiveType, columnAttributes);
-    case Types.BINARY:
-      return Integer.MAX_VALUE; // hive has no max limit for binary
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.INTEGER:
-    case Types.BIGINT:
-      return columnPrecision(hiveType, columnAttributes) + 1; // allow +/-
-    case Types.DATE:
-      return 10;
-    case Types.TIMESTAMP:
-    case Types.TIMESTAMP_WITH_TIMEZONE:
-      return columnPrecision(hiveType, columnAttributes);
+      case Types.NULL:
+        return 4; // "NULL"
+      case Types.BOOLEAN:
+        return columnPrecision(hiveType, columnAttributes);
+      case Types.CHAR:
+      case Types.VARCHAR:
+        return columnPrecision(hiveType, columnAttributes);
+      case Types.BINARY:
+        return Integer.MAX_VALUE; // hive has no max limit for binary
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+        return columnPrecision(hiveType, columnAttributes) + 1; // allow +/-
+      case Types.DATE:
+        return 10;
+      case Types.TIMESTAMP:
+      case Types.TIMESTAMP_WITH_TIMEZONE:
+        return columnPrecision(hiveType, columnAttributes);
 
-    // see 
http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Float.MAX_EXPONENT
-    case Types.FLOAT:
-      return 24; // e.g. -(17#).e-###
-    // see 
http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Double.MAX_EXPONENT
-    case Types.DOUBLE:
-      return 25; // e.g. -(17#).e-####
-    case Types.DECIMAL:
-      return columnPrecision(hiveType, columnAttributes) + 2;  // '-' sign and 
'.'
-    case Types.OTHER:
-    case Types.JAVA_OBJECT:
-      return columnPrecision(hiveType, columnAttributes);
-    case Types.ARRAY:
-    case Types.STRUCT:
-      return Integer.MAX_VALUE;
-    default:
-      throw new SQLException("Invalid column type: " + columnType);
+      // see 
http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Float.MAX_EXPONENT
+      case Types.FLOAT:
+        return 24; // e.g. -(17#).e-###
+      // see 
http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Double.MAX_EXPONENT
+      case Types.DOUBLE:
+        return 25; // e.g. -(17#).e-####
+      case Types.DECIMAL:
+        return columnPrecision(hiveType, columnAttributes) + 2;  // '-' sign 
and '.'
+      case Types.OTHER:
+      case Types.JAVA_OBJECT:
+        return columnPrecision(hiveType, columnAttributes);
+      case Types.ARRAY:
+      case Types.STRUCT:
+        return Integer.MAX_VALUE;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
     }
   }
 
@@ -284,56 +284,56 @@ public class JdbcColumn {
     int columnType = hiveTypeToSqlType(hiveType);
     // according to hiveTypeToSqlType possible options are:
     switch(columnType) {
-    case Types.NULL:
-      return 0;
-    case Types.BOOLEAN:
-      return 1;
-    case Types.CHAR:
-    case Types.VARCHAR:
-      if (columnAttributes != null) {
+      case Types.NULL:
+        return 0;
+      case Types.BOOLEAN:
+        return 1;
+      case Types.CHAR:
+      case Types.VARCHAR:
+        if (columnAttributes != null) {
+          return columnAttributes.precision;
+        }
+        return Integer.MAX_VALUE; // hive has no max limit for strings
+      case Types.BINARY:
+        return Integer.MAX_VALUE; // hive has no max limit for binary
+      case Types.TINYINT:
+        return 3;
+      case Types.SMALLINT:
+        return 5;
+      case Types.INTEGER:
+        return 10;
+      case Types.BIGINT:
+        return 19;
+      case Types.FLOAT:
+        return 7;
+      case Types.DOUBLE:
+        return 15;
+      case Types.DATE:
+        return 10;
+      case Types.TIMESTAMP:
+        return 29;
+      case Types.TIMESTAMP_WITH_TIMEZONE:
+        return 31;
+      case Types.DECIMAL:
         return columnAttributes.precision;
+      case Types.OTHER:
+      case Types.JAVA_OBJECT: {
+        switch (hiveType) {
+          case INTERVAL_YEAR_MONTH_TYPE:
+            // -yyyyyyy-mm  : should be more than enough
+            return 11;
+          case INTERVAL_DAY_TIME_TYPE:
+            // -ddddddddd hh:mm:ss.nnnnnnnnn
+            return 29;
+          default:
+            return Integer.MAX_VALUE;
+        }
       }
-      return Integer.MAX_VALUE; // hive has no max limit for strings
-    case Types.BINARY:
-      return Integer.MAX_VALUE; // hive has no max limit for binary
-    case Types.TINYINT:
-      return 3;
-    case Types.SMALLINT:
-      return 5;
-    case Types.INTEGER:
-      return 10;
-    case Types.BIGINT:
-      return 19;
-    case Types.FLOAT:
-      return 7;
-    case Types.DOUBLE:
-      return 15;
-    case Types.DATE:
-      return 10;
-    case Types.TIMESTAMP:
-      return 29;
-    case Types.TIMESTAMP_WITH_TIMEZONE:
-      return 31;
-    case Types.DECIMAL:
-      return columnAttributes.precision;
-    case Types.OTHER:
-    case Types.JAVA_OBJECT: {
-      switch (hiveType) {
-        case INTERVAL_YEAR_MONTH_TYPE:
-          // -yyyyyyy-mm  : should be more than enough
-          return 11;
-        case INTERVAL_DAY_TIME_TYPE:
-          // -ddddddddd hh:mm:ss.nnnnnnnnn
-          return 29;
-        default:
-          return Integer.MAX_VALUE;
-      }
-    }
-    case Types.ARRAY:
-    case Types.STRUCT:
-      return Integer.MAX_VALUE;
-    default:
-      throw new SQLException("Invalid column type: " + columnType);
+      case Types.ARRAY:
+      case Types.STRUCT:
+        return Integer.MAX_VALUE;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
     }
   }
 
@@ -342,33 +342,33 @@ public class JdbcColumn {
     int columnType = hiveTypeToSqlType(hiveType);
     // according to hiveTypeToSqlType possible options are:
     switch(columnType) {
-    case Types.NULL:
-    case Types.BOOLEAN:
-    case Types.CHAR:
-    case Types.VARCHAR:
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.INTEGER:
-    case Types.BIGINT:
-    case Types.DATE:
-    case Types.BINARY:
-      return 0;
-    case Types.FLOAT:
-      return 7;
-    case Types.DOUBLE:
-      return 15;
-    case  Types.TIMESTAMP:
-    case Types.TIMESTAMP_WITH_TIMEZONE:
-      return 9;
-    case Types.DECIMAL:
-      return columnAttributes.scale;
-    case Types.OTHER:
-    case Types.JAVA_OBJECT:
-    case Types.ARRAY:
-    case Types.STRUCT:
-      return 0;
-    default:
-      throw new SQLException("Invalid column type: " + columnType);
+      case Types.NULL:
+      case Types.BOOLEAN:
+      case Types.CHAR:
+      case Types.VARCHAR:
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+      case Types.DATE:
+      case Types.BINARY:
+        return 0;
+      case Types.FLOAT:
+        return 7;
+      case Types.DOUBLE:
+        return 15;
+      case  Types.TIMESTAMP:
+      case Types.TIMESTAMP_WITH_TIMEZONE:
+        return 9;
+      case Types.DECIMAL:
+        return columnAttributes.scale;
+      case Types.OTHER:
+      case Types.JAVA_OBJECT:
+      case Types.ARRAY:
+      case Types.STRUCT:
+        return 0;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
     }
   }
 
@@ -376,29 +376,29 @@ public class JdbcColumn {
       throws SQLException {
     int columnType = hiveTypeToSqlType(hiveType);
     switch(columnType) {
-    case Types.TINYINT:
-    case Types.SMALLINT:
-    case Types.INTEGER:
-    case Types.BIGINT:
-      return true;
-    default:
-      return false;
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+        return true;
+      default:
+        return false;
     }
   }
 
   public Integer getNumPrecRadix() {
     final String t = type.toLowerCase();
     switch (t) {
-    case "tinyint":
-    case "smallint":
-    case "int":
-    case "bigint":
-    case "float":
-    case "double":
-    case "decimal":
-      return 10;
-    default:
-      return null;
+      case "tinyint":
+      case "smallint":
+      case "int":
+      case "bigint":
+      case "float":
+      case "double":
+      case "decimal":
+        return 10;
+      default:
+        return null;
     }
   }
 

Reply via email to