HIVE-14170: Beeline IncrementalRows should buffer rows and incrementally 
re-calculate width if TableOutputFormat is used (Sahil Takiar, reviewed by Tao 
Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ebad27d5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ebad27d5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ebad27d5

Branch: refs/heads/hive-14535
Commit: ebad27d5164440c9db3080808c2e66c53c1d8b4d
Parents: 60ec753
Author: Sahil Takiar <[email protected]>
Authored: Tue Aug 30 10:34:23 2016 -0500
Committer: Sergio Pena <[email protected]>
Committed: Tue Aug 30 10:34:23 2016 -0500

----------------------------------------------------------------------
 beeline/pom.xml                                 |  5 ++
 .../java/org/apache/hive/beeline/BeeLine.java   | 10 ++-
 .../org/apache/hive/beeline/BeeLineOpts.java    | 10 +++
 .../org/apache/hive/beeline/BufferedRows.java   | 23 ++++-
 .../apache/hive/beeline/IncrementalRows.java    | 10 +--
 .../IncrementalRowsWithNormalization.java       | 86 +++++++++++++++++++
 .../src/java/org/apache/hive/beeline/Rows.java  |  2 +-
 beeline/src/main/resources/BeeLine.properties   |  4 +
 .../TestIncrementalRowsWithNormalization.java   | 90 ++++++++++++++++++++
 9 files changed, 228 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/pom.xml
----------------------------------------------------------------------
diff --git a/beeline/pom.xml b/beeline/pom.xml
index dc89e81..d03f770 100644
--- a/beeline/pom.xml
+++ b/beeline/pom.xml
@@ -134,6 +134,11 @@
       <version>9.1-901.jdbc4</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BeeLine.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java 
b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
index e0fa032..ecd60f6 100644
--- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java
+++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
@@ -2023,10 +2023,14 @@ public class BeeLine implements Closeable {
 
     Rows rows;
 
-    if (getOpts().getIncremental()) {
-      rows = new IncrementalRows(this, rs);
+    if (f instanceof TableOutputFormat) {
+      if (getOpts().getIncremental()) {
+        rows = new IncrementalRowsWithNormalization(this, rs);
+      } else {
+        rows = new BufferedRows(this, rs);
+      }
     } else {
-      rows = new BufferedRows(this, rs);
+      rows = new IncrementalRows(this, rs);
     }
     return f.print(rows);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java 
b/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java
index c44ac78..59fbca3 100644
--- a/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java
+++ b/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java
@@ -59,6 +59,7 @@ class BeeLineOpts implements Completer {
   public static final String DEFAULT_NULL_STRING = "NULL";
   public static final char DEFAULT_DELIMITER_FOR_DSV = '|';
   public static final int DEFAULT_MAX_COLUMN_WIDTH = 50;
+  public static final int DEFAULT_INCREMENTAL_BUFFER_ROWS = 1000;
 
   public static String URL_ENV_PREFIX = "BEELINE_URL_";
 
@@ -74,6 +75,7 @@ class BeeLineOpts implements Completer {
   private boolean verbose = false;
   private boolean force = false;
   private boolean incremental = false;
+  private int incrementalBufferRows = DEFAULT_INCREMENTAL_BUFFER_ROWS;
   private boolean showWarnings = false;
   private boolean showNestedErrs = false;
   private boolean showElapsedTime = true;
@@ -511,6 +513,14 @@ class BeeLineOpts implements Completer {
     return incremental;
   }
 
+  public void setIncrementalBufferRows(int incrementalBufferRows) {
+    this.incrementalBufferRows = incrementalBufferRows;
+  }
+
+  public int getIncrementalBufferRows() {
+    return this.incrementalBufferRows;
+  }
+
   public void setSilent(boolean silent) {
     this.silent = silent;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BufferedRows.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/BufferedRows.java 
b/beeline/src/java/org/apache/hive/beeline/BufferedRows.java
index 5604742..5369b08 100644
--- a/beeline/src/java/org/apache/hive/beeline/BufferedRows.java
+++ b/beeline/src/java/org/apache/hive/beeline/BufferedRows.java
@@ -27,6 +27,9 @@ import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.LinkedList;
 
+import com.google.common.base.Optional;
+
+
 /**
  * Rows implementation which buffers all rows in a linked list.
  */
@@ -36,21 +39,36 @@ class BufferedRows extends Rows {
   private int maxColumnWidth;
 
   BufferedRows(BeeLine beeLine, ResultSet rs) throws SQLException {
+    this(beeLine, rs, Optional.<Integer> absent());
+  }
+
+  BufferedRows(BeeLine beeLine, ResultSet rs, Optional<Integer> limit) throws 
SQLException {
     super(beeLine, rs);
     list = new LinkedList<Row>();
     int count = rsMeta.getColumnCount();
     list.add(new Row(count));
-    while (rs.next()) {
-      list.add(new Row(count, rs));
+
+    int numRowsBuffered = 0;
+    if (limit.isPresent()) {
+      while (limit.get() > numRowsBuffered && rs.next()) {
+        this.list.add(new Row(count, rs));
+        numRowsBuffered++;
+      }
+    } else {
+      while (rs.next()) {
+        this.list.add(new Row(count, rs));
+      }
     }
     iterator = list.iterator();
     maxColumnWidth = beeLine.getOpts().getMaxColumnWidth();
   }
 
+  @Override
   public boolean hasNext() {
     return iterator.hasNext();
   }
 
+  @Override
   public Object next() {
     return iterator.next();
   }
@@ -76,5 +94,4 @@ class BufferedRows extends Rows {
       row.sizes = max;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java 
b/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java
index 8aef976..f3f19a6 100644
--- a/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java
+++ b/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java
@@ -31,12 +31,12 @@ import java.util.NoSuchElementException;
  * without any buffering.
  */
 public class IncrementalRows extends Rows {
-  private final ResultSet rs;
+  protected final ResultSet rs;
   private final Row labelRow;
   private final Row maxRow;
   private Row nextRow;
   private boolean endOfResult;
-  private boolean normalizingWidths;
+  protected boolean normalizingWidths;
 
 
   IncrementalRows(BeeLine beeLine, ResultSet rs) throws SQLException {
@@ -53,8 +53,8 @@ public class IncrementalRows extends Rows {
       // normalized display width is based on maximum of display size
       // and label size
       maxRow.sizes[i] = Math.max(
-          maxRow.sizes[i],
-          rsMeta.getColumnDisplaySize(i + 1));
+              maxRow.sizes[i],
+              rsMeta.getColumnDisplaySize(i + 1));
       maxRow.sizes[i] = Math.min(maxWidth, maxRow.sizes[i]);
     }
 
@@ -104,4 +104,4 @@ public class IncrementalRows extends Rows {
     // for each row as it is produced
     normalizingWidths = true;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java
----------------------------------------------------------------------
diff --git 
a/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java
 
b/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java
new file mode 100644
index 0000000..6dbfe56
--- /dev/null
+++ 
b/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This source file is based on code taken from SQLLine 1.0.2
+ * See SQLLine notice in LICENSE
+ */
+package org.apache.hive.beeline;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Extension of {@link IncrementalRows} which buffers "x" number of rows in 
memory at a time. It
+ * uses the {@link BufferedRows} class to do its buffering. The value of "x" 
is determined  by the
+ * Beeline option <code>--incrementalBufferRows</code>, which defaults to
+ * {@link BeeLineOpts#DEFAULT_INCREMENTAL_BUFFER_ROWS}. Once the initial set 
of rows are buffered, it
+ * will allow the {@link #next()} method to drain the buffer. Once the buffer 
is empty the next
+ * buffer will be fetched until the {@link ResultSet} is empty. The width of 
the rows are normalized
+ * within each buffer using the {@link BufferedRows#normalizeWidths()} method.
+ */
+public class IncrementalRowsWithNormalization extends IncrementalRows {
+
+  private final int incrementalBufferRows;
+  private BufferedRows buffer;
+
+  IncrementalRowsWithNormalization(BeeLine beeLine, ResultSet rs) throws 
SQLException {
+    super(beeLine, rs);
+
+    this.incrementalBufferRows = beeLine.getOpts().getIncrementalBufferRows();
+    this.buffer = new BufferedRows(beeLine, rs, 
Optional.of(this.incrementalBufferRows));
+    this.buffer.normalizeWidths();
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (this.buffer.hasNext()) {
+        return true;
+      } else {
+        this.buffer = new BufferedRows(this.beeLine, this.rs,
+                Optional.of(this.incrementalBufferRows));
+        if (this.normalizingWidths) {
+          this.buffer.normalizeWidths();
+        }
+
+        // Drain the first Row, which just contains column names
+        if (!this.buffer.hasNext()) {
+          return false;
+        }
+        this.buffer.next();
+
+        return this.buffer.hasNext();
+      }
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex.toString());
+    }
+  }
+
+  @Override
+  public Object next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return this.buffer.next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/Rows.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Rows.java 
b/beeline/src/java/org/apache/hive/beeline/Rows.java
index 453f685..924b951 100644
--- a/beeline/src/java/org/apache/hive/beeline/Rows.java
+++ b/beeline/src/java/org/apache/hive/beeline/Rows.java
@@ -35,7 +35,7 @@ import java.util.Iterator;
  * Holds column values as strings
  */
 abstract class Rows implements Iterator {
-  private final BeeLine beeLine;
+  protected final BeeLine beeLine;
   final ResultSetMetaData rsMeta;
   final Boolean[] primaryKeys;
   final NumberFormat numberFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/main/resources/BeeLine.properties
----------------------------------------------------------------------
diff --git a/beeline/src/main/resources/BeeLine.properties 
b/beeline/src/main/resources/BeeLine.properties
index 748c4b6..13321d2 100644
--- a/beeline/src/main/resources/BeeLine.properties
+++ b/beeline/src/main/resources/BeeLine.properties
@@ -184,6 +184,10 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine 
\n \
 \                                  memory usage at the price of extra display 
column padding.\n \
 \                                  Setting --incremental=true is recommended 
if you encounter an OutOfMemory\n \
 \                                  on the client side (due to the fetched 
result set size being large).\n \
+\                                  Only applicable if --outputformat=table.\n \
+\  --incrementalBufferRows=NUMROWS the number of rows to buffer when printing 
rows on stdout,\n \
+\                                  defaults to 1000; only applicable if 
--incremental=true\n \
+\                                  and --outputformat=table\n \
 \  --truncateTable=[true/false]    truncate table column when it exceeds 
length\n \
 \  --delimiterForDSV=DELIMITER     specify the delimiter for 
delimiter-separated values output format (default: |)\n \
 \  --isolation=LEVEL               set the transaction isolation level\n \

http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java
----------------------------------------------------------------------
diff --git 
a/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java
 
b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java
new file mode 100644
index 0000000..68da841
--- /dev/null
+++ 
b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.beeline;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestIncrementalRowsWithNormalization {
+
+  @Test
+  public void testIncrementalRows() throws SQLException {
+    Integer incrementalBufferRows = 5;
+
+    // Mock BeeLineOpts
+    BeeLineOpts mockBeeLineOpts = mock(BeeLineOpts.class);
+    
when(mockBeeLineOpts.getIncrementalBufferRows()).thenReturn(incrementalBufferRows);
+    
when(mockBeeLineOpts.getMaxColumnWidth()).thenReturn(BeeLineOpts.DEFAULT_MAX_COLUMN_WIDTH);
+    when(mockBeeLineOpts.getNumberFormat()).thenReturn("default");
+    when(mockBeeLineOpts.getNullString()).thenReturn("NULL");
+
+    // Mock BeeLine
+    BeeLine mockBeeline = mock(BeeLine.class);
+    when(mockBeeline.getOpts()).thenReturn(mockBeeLineOpts);
+
+    // MockResultSet
+    ResultSet mockResultSet = mock(ResultSet.class);
+
+    ResultSetMetaData mockResultSetMetaData = mock(ResultSetMetaData.class);
+    when(mockResultSetMetaData.getColumnCount()).thenReturn(1);
+    when(mockResultSetMetaData.getColumnLabel(1)).thenReturn("Mock Table");
+    when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData);
+
+    // First 10 calls to resultSet.next() should return true
+    when(mockResultSet.next()).thenAnswer(new Answer<Boolean>() {
+      private int iterations = 10;
+
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        return this.iterations-- > 0;
+      }
+    });
+
+    when(mockResultSet.getString(1)).thenReturn("Hello World");
+
+    // IncrementalRows constructor should buffer the first 
"incrementalBufferRows" rows
+    IncrementalRowsWithNormalization incrementalRowsWithNormalization = new 
IncrementalRowsWithNormalization(
+            mockBeeline, mockResultSet);
+
+    // When the first buffer is loaded ResultSet.next() should be called 
"incrementalBufferRows" times
+    verify(mockResultSet, times(5)).next();
+
+    // Iterating through the buffer should not cause the next buffer to be 
fetched
+    for (int i = 0; i < incrementalBufferRows + 1; i++) {
+      incrementalRowsWithNormalization.next();
+    }
+    verify(mockResultSet, times(5)).next();
+
+    // When a new buffer is fetched ResultSet.next() should be called 
"incrementalBufferRows" more times
+    incrementalRowsWithNormalization.next();
+    verify(mockResultSet, times(10)).next();
+  }
+}

Reply via email to