This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c914a1  [LIVY-572] Avoid usage of spark classes in ColumnBuffer
1c914a1 is described below

commit 1c914a1f0fab80a11285ea7f602bb445b797c0a8
Author: Marco Gaido <mga...@apache.org>
AuthorDate: Tue Mar 26 17:43:16 2019 -0700

    [LIVY-572] Avoid usage of spark classes in ColumnBuffer
    
    ## What changes were proposed in this pull request?
    
    The `ColumnBuffers` can be created both inside spark jobs and in the Livy 
server. The latter case happens when operation logs are returned and  fails 
before this patch because we are using Spark classes in this code after the 
refactor in LIVY-503. Unfortunately, we do not have test coverage for operation 
logs retrieval and this is the reason why this wasn't spot out earlier.
    
    Since operation logs are retrieved by beeline for each query, this means 
that every query run through beeline fails, unless 
`livy.server.thrift.logging.operation.enabled` is set to `false`.
    
    ## How was this patch tested?
    
    manual tests using beeline
    
    Author: Marco Gaido <mga...@apache.org>
    
    Closes #162 from mgaido91/LIVY-572.
---
 .../livy/thriftserver/session/ColumnBuffer.java    | 67 +------------------
 .../livy/thriftserver/session/ResultSet.java       | 76 +++++++++++++++++++++-
 2 files changed, 76 insertions(+), 67 deletions(-)

diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
index c29980b..4408586 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
@@ -16,26 +16,12 @@
  */
 package org.apache.livy.thriftserver.session;
 
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import scala.Tuple2;
-import scala.collection.Map;
-import scala.collection.Seq;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructField;
 
 /**
  * Container for the contents of a single column in a result set.
@@ -180,7 +166,7 @@ public class ColumnBuffer {
       buffers[currentSize] = (byte[]) value;
       break;
     case STRING:
-      strings[currentSize] = toHiveString(value, false);
+      strings[currentSize] = (String) value;
       break;
     }
 
@@ -264,57 +250,6 @@ public class ColumnBuffer {
     nulls[byteIdx] = (byte) (nulls[byteIdx] | (1 << bitIdx));
   }
 
-  /**
-   * Converts a value from a Spark dataset into a string that looks like what 
Hive would
-   * generate. Because Spark generates rows that contain Scala types for 
non-primitive
-   * columns, this code depends on Scala and is thus succeptible to binary 
compatibility
-   * changes in the Scala libraries.
-   *
-   * The supported types are described in Spark's SQL programming guide, in 
the table
-   * listing the mapping of SQL types to Scala types.
-   *
-   * @param value The object to stringify.
-   * @param quoteStrings Whether to wrap String instances in quotes.
-   */
-  private String toHiveString(Object value, boolean quoteStrings) {
-    if (quoteStrings && value instanceof String) {
-      return "\"" + value + "\"";
-    } else if (value instanceof BigDecimal) {
-      return ((BigDecimal) value).stripTrailingZeros().toString();
-    } else if (value instanceof Map) {
-      return stream(new ScalaIterator<>(((Map<?,?>) value).iterator()))
-        .map(o -> toHiveString(o, true))
-        .sorted()
-        .collect(Collectors.joining(",", "{", "}"));
-    } else if (value instanceof Seq) {
-      return stream(new ScalaIterator<>(((Seq<?>) value).iterator()))
-        .map(o -> toHiveString(o, true))
-        .collect(Collectors.joining(",", "[", "]"));
-    } else if (value instanceof Tuple2) {
-      Tuple2 t = (Tuple2) value;
-      return String.format("%s:%s", toHiveString(t._1(), true), 
toHiveString(t._2(), true));
-    } else if (value instanceof Row) {
-      Row r = (Row) value;
-      final StructField[] fields = r.schema().fields();
-      final AtomicInteger idx = new AtomicInteger();
-
-      return stream(new ScalaIterator<>(r.toSeq().iterator()))
-        .map(o -> {
-          String fname = fields[idx.getAndIncrement()].name();
-          String fval = toHiveString(o, true);
-          return String.format("\"%s\":%s", fname, fval);
-        })
-        .collect(Collectors.joining(",", "{", "}"));
-    } else {
-      return value.toString();
-    }
-  }
-
-  private Stream<?> stream(Iterator<?> it) {
-    return StreamSupport.stream(
-      Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
-  }
-
   private void ensureCapacity() {
     int nextSize = (currentSize + DEFAULT_SIZE);
     nextSize = nextSize - (nextSize % DEFAULT_SIZE);
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
index 3fb69f4..317c4e6 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
@@ -16,6 +16,22 @@
  */
 package org.apache.livy.thriftserver.session;
 
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import scala.Tuple2;
+import scala.collection.Map;
+import scala.collection.Seq;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+
 /**
  * Utility class used for transferring results from the Spark application to 
the Livy server.
  */
@@ -43,7 +59,13 @@ public class ResultSet {
     }
 
     for (int i = 0; i < fields.length; i++) {
-      columns[i].add(fields[i]);
+      Object value;
+      if (columns[i].getType() == DataType.STRING) {
+        value = toHiveString(fields[i], false);
+      } else {
+        value = fields[i];
+      }
+      columns[i].add(value);
     }
   }
 
@@ -55,4 +77,56 @@ public class ResultSet {
     return columns;
   }
 
+  /**
+   * Converts a value from a Spark dataset into a string that looks like what 
Hive would
+   * generate. Because Spark generates rows that contain Scala types for 
non-primitive
+   * columns, this code depends on Scala and is thus susceptible to binary 
compatibility
+   * changes in the Scala libraries.
+   *
+   * The supported types are described in Spark's SQL programming guide, in 
the table
+   * listing the mapping of SQL types to Scala types.
+   *
+   * @param value The object to stringify.
+   * @param quoteStrings Whether to wrap String instances in quotes.
+   */
+  private String toHiveString(Object value, boolean quoteStrings) {
+    if (value == null) {
+      return null;
+    } else if (quoteStrings && value instanceof String) {
+      return "\"" + value + "\"";
+    } else if (value instanceof BigDecimal) {
+      return ((BigDecimal) value).stripTrailingZeros().toString();
+    } else if (value instanceof Map) {
+      return stream(new ScalaIterator<>(((Map<?,?>) value).iterator()))
+        .map(o -> toHiveString(o, true))
+        .sorted()
+        .collect(Collectors.joining(",", "{", "}"));
+    } else if (value instanceof Seq) {
+      return stream(new ScalaIterator<>(((Seq<?>) value).iterator()))
+        .map(o -> toHiveString(o, true))
+        .collect(Collectors.joining(",", "[", "]"));
+    } else if (value instanceof Tuple2) {
+      Tuple2 t = (Tuple2) value;
+      return String.format("%s:%s", toHiveString(t._1(), true), 
toHiveString(t._2(), true));
+    } else if (value instanceof Row) {
+      Row r = (Row) value;
+      final StructField[] fields = r.schema().fields();
+      final AtomicInteger idx = new AtomicInteger();
+
+      return stream(new ScalaIterator<>(r.toSeq().iterator()))
+        .map(o -> {
+          String fname = fields[idx.getAndIncrement()].name();
+          String fval = toHiveString(o, true);
+          return String.format("\"%s\":%s", fname, fval);
+        })
+        .collect(Collectors.joining(",", "{", "}"));
+    } else {
+      return value.toString();
+    }
+  }
+
+  private Stream<?> stream(Iterator<?> it) {
+    return StreamSupport.stream(
+      Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
+  }
 }

Reply via email to