HIVE-14170: Beeline IncrementalRows should buffer rows and incrementally re-calculate width if TableOutputFormat is used (Sahil Takiar, reviewed by Tao Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ebad27d5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ebad27d5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ebad27d5 Branch: refs/heads/hive-14535 Commit: ebad27d5164440c9db3080808c2e66c53c1d8b4d Parents: 60ec753 Author: Sahil Takiar <[email protected]> Authored: Tue Aug 30 10:34:23 2016 -0500 Committer: Sergio Pena <[email protected]> Committed: Tue Aug 30 10:34:23 2016 -0500 ---------------------------------------------------------------------- beeline/pom.xml | 5 ++ .../java/org/apache/hive/beeline/BeeLine.java | 10 ++- .../org/apache/hive/beeline/BeeLineOpts.java | 10 +++ .../org/apache/hive/beeline/BufferedRows.java | 23 ++++- .../apache/hive/beeline/IncrementalRows.java | 10 +-- .../IncrementalRowsWithNormalization.java | 86 +++++++++++++++++++ .../src/java/org/apache/hive/beeline/Rows.java | 2 +- beeline/src/main/resources/BeeLine.properties | 4 + .../TestIncrementalRowsWithNormalization.java | 90 ++++++++++++++++++++ 9 files changed, 228 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/pom.xml ---------------------------------------------------------------------- diff --git a/beeline/pom.xml b/beeline/pom.xml index dc89e81..d03f770 100644 --- a/beeline/pom.xml +++ b/beeline/pom.xml @@ -134,6 +134,11 @@ <version>9.1-901.jdbc4</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BeeLine.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java index e0fa032..ecd60f6 100644 --- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java +++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java @@ -2023,10 +2023,14 @@ public class BeeLine implements Closeable { Rows rows; - if (getOpts().getIncremental()) { - rows = new IncrementalRows(this, rs); + if (f instanceof TableOutputFormat) { + if (getOpts().getIncremental()) { + rows = new IncrementalRowsWithNormalization(this, rs); + } else { + rows = new BufferedRows(this, rs); + } } else { - rows = new BufferedRows(this, rs); + rows = new IncrementalRows(this, rs); } return f.print(rows); } http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java b/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java index c44ac78..59fbca3 100644 --- a/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java +++ b/beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java @@ -59,6 +59,7 @@ class BeeLineOpts implements Completer { public static final String DEFAULT_NULL_STRING = "NULL"; public static final char DEFAULT_DELIMITER_FOR_DSV = '|'; public static final int DEFAULT_MAX_COLUMN_WIDTH = 50; + public static final int DEFAULT_INCREMENTAL_BUFFER_ROWS = 1000; public static String URL_ENV_PREFIX = "BEELINE_URL_"; @@ -74,6 +75,7 @@ class BeeLineOpts implements Completer { private boolean verbose = false; private boolean force = false; private boolean incremental = false; + private int incrementalBufferRows = DEFAULT_INCREMENTAL_BUFFER_ROWS; private boolean showWarnings = false; private boolean showNestedErrs = false; private boolean showElapsedTime = true; @@ -511,6 +513,14 @@ class BeeLineOpts implements Completer { return incremental; } + public void setIncrementalBufferRows(int incrementalBufferRows) { + this.incrementalBufferRows = incrementalBufferRows; + } + + public int getIncrementalBufferRows() { + return this.incrementalBufferRows; + } + public void setSilent(boolean silent) { this.silent = silent; } http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/BufferedRows.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/BufferedRows.java b/beeline/src/java/org/apache/hive/beeline/BufferedRows.java index 5604742..5369b08 100644 --- a/beeline/src/java/org/apache/hive/beeline/BufferedRows.java +++ b/beeline/src/java/org/apache/hive/beeline/BufferedRows.java @@ -27,6 +27,9 @@ import java.sql.SQLException; import java.util.Iterator; import java.util.LinkedList; +import com.google.common.base.Optional; + + /** * Rows implementation which buffers all rows in a linked list. */ @@ -36,21 +39,36 @@ class BufferedRows extends Rows { private int maxColumnWidth; BufferedRows(BeeLine beeLine, ResultSet rs) throws SQLException { + this(beeLine, rs, Optional.<Integer> absent()); + } + + BufferedRows(BeeLine beeLine, ResultSet rs, Optional<Integer> limit) throws SQLException { super(beeLine, rs); list = new LinkedList<Row>(); int count = rsMeta.getColumnCount(); list.add(new Row(count)); - while (rs.next()) { - list.add(new Row(count, rs)); + + int numRowsBuffered = 0; + if (limit.isPresent()) { + while (limit.get() > numRowsBuffered && rs.next()) { + this.list.add(new Row(count, rs)); + numRowsBuffered++; + } + } else { + while (rs.next()) { + this.list.add(new Row(count, rs)); + } } iterator = list.iterator(); maxColumnWidth = beeLine.getOpts().getMaxColumnWidth(); } + @Override public boolean hasNext() { return iterator.hasNext(); } + @Override public Object next() { return iterator.next(); } @@ -76,5 +94,4 @@ class BufferedRows extends Rows { row.sizes = max; } } - } http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java b/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java index 8aef976..f3f19a6 100644 --- a/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java +++ b/beeline/src/java/org/apache/hive/beeline/IncrementalRows.java @@ -31,12 +31,12 @@ import java.util.NoSuchElementException; * without any buffering. */ public class IncrementalRows extends Rows { - private final ResultSet rs; + protected final ResultSet rs; private final Row labelRow; private final Row maxRow; private Row nextRow; private boolean endOfResult; - private boolean normalizingWidths; + protected boolean normalizingWidths; IncrementalRows(BeeLine beeLine, ResultSet rs) throws SQLException { @@ -53,8 +53,8 @@ public class IncrementalRows extends Rows { // normalized display width is based on maximum of display size // and label size maxRow.sizes[i] = Math.max( - maxRow.sizes[i], - rsMeta.getColumnDisplaySize(i + 1)); + maxRow.sizes[i], + rsMeta.getColumnDisplaySize(i + 1)); maxRow.sizes[i] = Math.min(maxWidth, maxRow.sizes[i]); } @@ -104,4 +104,4 @@ public class IncrementalRows extends Rows { // for each row as it is produced normalizingWidths = true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java b/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java new file mode 100644 index 0000000..6dbfe56 --- /dev/null +++ b/beeline/src/java/org/apache/hive/beeline/IncrementalRowsWithNormalization.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This source file is based on code taken from SQLLine 1.0.2 + * See SQLLine notice in LICENSE + */ +package org.apache.hive.beeline; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.NoSuchElementException; + +import com.google.common.base.Optional; + + +/** + * Extension of {@link IncrementalRows} which buffers "x" number of rows in memory at a time. It + * uses the {@link BufferedRows} class to do its buffering. The value of "x" is determined by the + * Beeline option <code>--incrementalBufferRows</code>, which defaults to + * {@link BeeLineOpts#DEFAULT_INCREMENTAL_BUFFER_ROWS}. Once the initial set of rows are buffered, it + * will allow the {@link #next()} method to drain the buffer. Once the buffer is empty the next + * buffer will be fetched until the {@link ResultSet} is empty. The width of the rows are normalized + * within each buffer using the {@link BufferedRows#normalizeWidths()} method. + */ +public class IncrementalRowsWithNormalization extends IncrementalRows { + + private final int incrementalBufferRows; + private BufferedRows buffer; + + IncrementalRowsWithNormalization(BeeLine beeLine, ResultSet rs) throws SQLException { + super(beeLine, rs); + + this.incrementalBufferRows = beeLine.getOpts().getIncrementalBufferRows(); + this.buffer = new BufferedRows(beeLine, rs, Optional.of(this.incrementalBufferRows)); + this.buffer.normalizeWidths(); + } + + @Override + public boolean hasNext() { + try { + if (this.buffer.hasNext()) { + return true; + } else { + this.buffer = new BufferedRows(this.beeLine, this.rs, + Optional.of(this.incrementalBufferRows)); + if (this.normalizingWidths) { + this.buffer.normalizeWidths(); + } + + // Drain the first Row, which just contains column names + if (!this.buffer.hasNext()) { + return false; + } + this.buffer.next(); + + return this.buffer.hasNext(); + } + } catch (SQLException ex) { + throw new RuntimeException(ex.toString()); + } + } + + @Override + public Object next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return this.buffer.next(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/java/org/apache/hive/beeline/Rows.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/Rows.java b/beeline/src/java/org/apache/hive/beeline/Rows.java index 453f685..924b951 100644 --- a/beeline/src/java/org/apache/hive/beeline/Rows.java +++ b/beeline/src/java/org/apache/hive/beeline/Rows.java @@ -35,7 +35,7 @@ import java.util.Iterator; * Holds column values as strings */ abstract class Rows implements Iterator { - private final BeeLine beeLine; + protected final BeeLine beeLine; final ResultSetMetaData rsMeta; final Boolean[] primaryKeys; final NumberFormat numberFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/main/resources/BeeLine.properties ---------------------------------------------------------------------- diff --git a/beeline/src/main/resources/BeeLine.properties b/beeline/src/main/resources/BeeLine.properties index 748c4b6..13321d2 100644 --- a/beeline/src/main/resources/BeeLine.properties +++ b/beeline/src/main/resources/BeeLine.properties @@ -184,6 +184,10 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine \n \ \ memory usage at the price of extra display column padding.\n \ \ Setting --incremental=true is recommended if you encounter an OutOfMemory\n \ \ on the client side (due to the fetched result set size being large).\n \ +\ Only applicable if --outputformat=table.\n \ +\ --incrementalBufferRows=NUMROWS the number of rows to buffer when printing rows on stdout,\n \ +\ defaults to 1000; only applicable if --incremental=true\n \ +\ and --outputformat=table\n \ \ --truncateTable=[true/false] truncate table column when it exceeds length\n \ \ --delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)\n \ \ --isolation=LEVEL set the transaction isolation level\n \ http://git-wip-us.apache.org/repos/asf/hive/blob/ebad27d5/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java ---------------------------------------------------------------------- diff --git a/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java new file mode 100644 index 0000000..68da841 --- /dev/null +++ b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRowsWithNormalization.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.junit.Test; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestIncrementalRowsWithNormalization { + + @Test + public void testIncrementalRows() throws SQLException { + Integer incrementalBufferRows = 5; + + // Mock BeeLineOpts + BeeLineOpts mockBeeLineOpts = mock(BeeLineOpts.class); + when(mockBeeLineOpts.getIncrementalBufferRows()).thenReturn(incrementalBufferRows); + when(mockBeeLineOpts.getMaxColumnWidth()).thenReturn(BeeLineOpts.DEFAULT_MAX_COLUMN_WIDTH); + when(mockBeeLineOpts.getNumberFormat()).thenReturn("default"); + when(mockBeeLineOpts.getNullString()).thenReturn("NULL"); + + // Mock BeeLine + BeeLine mockBeeline = mock(BeeLine.class); + when(mockBeeline.getOpts()).thenReturn(mockBeeLineOpts); + + // MockResultSet + ResultSet mockResultSet = mock(ResultSet.class); + + ResultSetMetaData mockResultSetMetaData = mock(ResultSetMetaData.class); + when(mockResultSetMetaData.getColumnCount()).thenReturn(1); + when(mockResultSetMetaData.getColumnLabel(1)).thenReturn("Mock Table"); + when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); + + // First 10 calls to resultSet.next() should return true + when(mockResultSet.next()).thenAnswer(new Answer<Boolean>() { + private int iterations = 10; + + @Override + public Boolean answer(InvocationOnMock invocation) { + return this.iterations-- > 0; + } + }); + + when(mockResultSet.getString(1)).thenReturn("Hello World"); + + // IncrementalRows constructor should buffer the first "incrementalBufferRows" rows + IncrementalRowsWithNormalization incrementalRowsWithNormalization = new IncrementalRowsWithNormalization( + mockBeeline, mockResultSet); + + // When the first buffer is loaded ResultSet.next() should be called "incrementalBufferRows" times + verify(mockResultSet, times(5)).next(); + + // Iterating through the buffer should not cause the next buffer to be fetched + for (int i = 0; i < incrementalBufferRows + 1; i++) { + incrementalRowsWithNormalization.next(); + } + verify(mockResultSet, times(5)).next(); + + // When a new buffer is fetched ResultSet.next() should be called "incrementalBufferRows" more times + incrementalRowsWithNormalization.next(); + verify(mockResultSet, times(10)).next(); + } +}
