http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java b/itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java new file mode 100644 index 0000000..f1b53f7 --- /dev/null +++ b/itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.SQLException; + +/** + * QFile test client using BeeLine. It can be used to submit a list of command strings, or a QFile. + */ +public class QFileBeeLineClient implements AutoCloseable { + private BeeLine beeLine; + private PrintStream beelineOutputStream; + private File logFile; + + protected QFileBeeLineClient(String jdbcUrl, String jdbcDriver, String username, String password, + File log) throws IOException { + logFile = log; + beeLine = new BeeLine(); + beelineOutputStream = new PrintStream(logFile, "UTF-8"); + beeLine.setOutputStream(beelineOutputStream); + beeLine.setErrorStream(beelineOutputStream); + beeLine.runCommands( + new String[] { + "!set verbose true", + "!set shownestederrs true", + "!set showwarnings true", + "!set showelapsedtime false", + "!set trimscripts false", + "!set maxwidth -1", + "!connect " + jdbcUrl + " " + username + " " + password + " " + jdbcDriver + }); + } + + public void execute(String[] commands, File resultFile) throws SQLException { + beeLine.runCommands( + new String[] { + "!record " + resultFile.getAbsolutePath() + }); + + int lastSuccessfulCommand = beeLine.runCommands(commands); + if (commands.length != lastSuccessfulCommand) { + throw new SQLException("Error executing SQL command: " + commands[lastSuccessfulCommand]); + } + + beeLine.runCommands(new String[] {"!record"}); + } + + private void beforeExecute(QFile qFile) throws SQLException { + execute( + new String[] { + "!set outputformat tsv2", + "!set verbose false", + "!set silent true", + "!set showheader false", + "USE default;", + "SHOW TABLES;", + "DROP DATABASE IF EXISTS `" + qFile.getName() + "` CASCADE;", + "CREATE DATABASE `" + qFile.getName() + "`;", + "USE `" + qFile.getName() + "`;", + "set hive.in.test.short.logs=true;", + "set hive.in.test.remove.logs=false;", + }, + qFile.getBeforeExecuteLogFile()); + beeLine.setIsTestMode(true); + } + + private void afterExecute(QFile qFile) throws SQLException { + beeLine.setIsTestMode(false); + execute( + new String[] { + "set hive.in.test.short.logs=false;", + "!set verbose true", + "!set silent false", + "!set showheader true", + "!set outputformat table", + "USE default;", + "DROP DATABASE IF EXISTS `" + qFile.getName() + "` CASCADE;", + }, + qFile.getAfterExecuteLogFile()); + } + + public void execute(QFile qFile) throws SQLException, IOException { + beforeExecute(qFile); + String[] commands = beeLine.getCommands(qFile.getInputFile()); + execute(qFile.filterCommands(commands), qFile.getRawOutputFile()); + afterExecute(qFile); + } + + public void close() { + if (beeLine != null) { + beeLine.runCommands(new String[] { + "!quit" + }); + } + if (beelineOutputStream != null) { + beelineOutputStream.close(); + } + } + + /** + * Builder to generated QFileBeeLineClient objects. The after initializing the builder, it can be + * used to create new clients without any parameters. + */ + public static class QFileClientBuilder { + private String username; + private String password; + private String jdbcUrl; + private String jdbcDriver; + + public QFileClientBuilder() { + } + + public QFileClientBuilder setUsername(String username) { + this.username = username; + return this; + } + + public QFileClientBuilder setPassword(String password) { + this.password = password; + return this; + } + + public QFileClientBuilder setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + return this; + } + + public QFileClientBuilder setJdbcDriver(String jdbcDriver) { + this.jdbcDriver = jdbcDriver; + return this; + } + + public QFileBeeLineClient getClient(File logFile) throws IOException { + return new QFileBeeLineClient(jdbcUrl, jdbcDriver, username, password, logFile); + } + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/util/src/main/java/org/apache/hive/beeline/package-info.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/beeline/package-info.java b/itests/util/src/main/java/org/apache/hive/beeline/package-info.java new file mode 100644 index 0000000..e05ac0a --- /dev/null +++ b/itests/util/src/main/java/org/apache/hive/beeline/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package for the BeeLine specific QTest classes. + */ +package org.apache.hive.beeline; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFile.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFile.java b/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFile.java deleted file mode 100644 index 49d6d24..0000000 --- a/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFile.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.beeline.qfile; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.util.Shell; -import org.apache.hive.common.util.StreamPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Class for representing a Query and the connected files. It provides accessors for the specific - * input and output files, and provides methods for filtering the output of the runs. - */ -public final class QFile { - private static final Logger LOG = LoggerFactory.getLogger(QFile.class.getName()); - - private String name; - private File inputFile; - private File rawOutputFile; - private File outputFile; - private File expcetedOutputFile; - private File logFile; - private File infraLogFile; - private static RegexFilterSet staticFilterSet = getStaticFilterSet(); - private RegexFilterSet specificFilterSet; - - private QFile() {} - - public String getName() { - return name; - } - - public File getInputFile() { - return inputFile; - } - - public File getRawOutputFile() { - return rawOutputFile; - } - - public File getOutputFile() { - return outputFile; - } - - public File getExpectedOutputFile() { - return expcetedOutputFile; - } - - public File getLogFile() { - return logFile; - } - - public File getInfraLogFile() { - return infraLogFile; - } - - public void filterOutput() throws IOException { - String rawOutput = FileUtils.readFileToString(rawOutputFile); - String filteredOutput = staticFilterSet.filter(specificFilterSet.filter(rawOutput)); - FileUtils.writeStringToFile(outputFile, filteredOutput); - } - - public boolean compareResults() throws IOException, InterruptedException { - if (!expcetedOutputFile.exists()) { - LOG.error("Expected results file does not exist: " + expcetedOutputFile); - return false; - } - return executeDiff(); - } - - public void overwriteResults() throws IOException { - if (expcetedOutputFile.exists()) { - FileUtils.forceDelete(expcetedOutputFile); - } - FileUtils.copyFile(outputFile, expcetedOutputFile); - } - - private boolean executeDiff() throws IOException, InterruptedException { - List<String> diffCommandArgs = new ArrayList<String>(); - diffCommandArgs.add("diff"); - - // Text file comparison - diffCommandArgs.add("-a"); - - if (Shell.WINDOWS) { - // Ignore changes in the amount of white space - diffCommandArgs.add("-b"); - - // Files created on Windows machines have different line endings - // than files created on Unix/Linux. Windows uses carriage return and line feed - // ("\r\n") as a line ending, whereas Unix uses just line feed ("\n"). - // Also StringBuilder.toString(), Stream to String conversions adds extra - // spaces at the end of the line. - diffCommandArgs.add("--strip-trailing-cr"); // Strip trailing carriage return on input - diffCommandArgs.add("-B"); // Ignore changes whose lines are all blank - } - - // Add files to compare to the arguments list - diffCommandArgs.add(getQuotedString(expcetedOutputFile)); - diffCommandArgs.add(getQuotedString(outputFile)); - - System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(diffCommandArgs, - ' ')); - Process executor = Runtime.getRuntime().exec(diffCommandArgs.toArray( - new String[diffCommandArgs.size()])); - - StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err); - StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); - - outPrinter.start(); - errPrinter.start(); - - int result = executor.waitFor(); - - outPrinter.join(); - errPrinter.join(); - - executor.waitFor(); - - return (result == 0); - } - - private static String getQuotedString(File file) { - return Shell.WINDOWS ? String.format("\"%s\"", file.getAbsolutePath()) : file.getAbsolutePath(); - } - - private static class RegexFilterSet { - private final Map<Pattern, String> regexFilters = new LinkedHashMap<Pattern, String>(); - - public RegexFilterSet addFilter(String regex, String replacement) { - regexFilters.put(Pattern.compile(regex), replacement); - return this; - } - - public String filter(String input) { - for (Pattern pattern : regexFilters.keySet()) { - input = pattern.matcher(input).replaceAll(regexFilters.get(pattern)); - } - return input; - } - } - - // These are the filters which are common for every QTest. - // Check specificFilterSet for QTest specific ones. - private static RegexFilterSet getStaticFilterSet() { - // Extract the leading four digits from the unix time value. - // Use this as a prefix in order to increase the selectivity - // of the unix time stamp replacement regex. - String currentTimePrefix = Long.toString(System.currentTimeMillis()).substring(0, 4); - - String userName = System.getProperty("user.name"); - - String timePattern = "(Mon|Tue|Wed|Thu|Fri|Sat|Sun) " - + "(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " - + "\\d{2} \\d{2}:\\d{2}:\\d{2} \\w+ 20\\d{2}"; - // Pattern to remove the timestamp and other infrastructural info from the out file - String logPattern = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2},\\d*\\s+\\S+\\s+\\[" + - ".*\\]\\s+\\S+:\\s+"; - String operatorPattern = "\"(CONDITION|COPY|DEPENDENCY_COLLECTION|DDL" - + "|EXPLAIN|FETCH|FIL|FS|FUNCTION|GBY|HASHTABLEDUMMY|HASTTABLESINK|JOIN" - + "|LATERALVIEWFORWARD|LIM|LVJ|MAP|MAPJOIN|MAPRED|MAPREDLOCAL|MOVE|OP|RS" - + "|SCR|SEL|STATS|TS|UDTF|UNION)_\\d+\""; - - return new RegexFilterSet() - .addFilter(logPattern, "") - .addFilter("Getting log thread is interrupted, since query is done!\n", "") - .addFilter("going to print operations logs\n", "") - .addFilter("printed operations logs\n", "") - .addFilter("\\(queryId=[^\\)]*\\)", "queryId=(!!{queryId}!!)") - .addFilter("file:/\\w\\S+", "file:/!!ELIDED!!") - .addFilter("pfile:/\\w\\S+", "pfile:/!!ELIDED!!") - .addFilter("hdfs:/\\w\\S+", "hdfs:/!!ELIDED!!") - .addFilter("last_modified_by=\\w+", "last_modified_by=!!ELIDED!!") - .addFilter(timePattern, "!!TIMESTAMP!!") - .addFilter("(\\D)" + currentTimePrefix + "\\d{6}(\\D)", "$1!!UNIXTIME!!$2") - .addFilter("(\\D)" + currentTimePrefix + "\\d{9}(\\D)", "$1!!UNIXTIMEMILLIS!!$2") - .addFilter(userName, "!!{user.name}!!") - .addFilter(operatorPattern, "\"$1_!!ELIDED!!\"") - .addFilter("Time taken: [0-9\\.]* seconds", "Time taken: !!ELIDED!! seconds"); - } - - /** - * Builder to generate QFile objects. After initializing the builder it is possible the - * generate the next QFile object using it's name only. - */ - public static class QFileBuilder { - private File queryDirectory; - private File logDirectory; - private File resultsDirectory; - private String scratchDirectoryString; - private String warehouseDirectoryString; - private File hiveRootDirectory; - - public QFileBuilder() { - } - - public QFileBuilder setQueryDirectory(File queryDirectory) { - this.queryDirectory = queryDirectory; - return this; - } - - public QFileBuilder setLogDirectory(File logDirectory) { - this.logDirectory = logDirectory; - return this; - } - - public QFileBuilder setResultsDirectory(File resultsDirectory) { - this.resultsDirectory = resultsDirectory; - return this; - } - - public QFileBuilder setScratchDirectoryString(String scratchDirectoryString) { - this.scratchDirectoryString = scratchDirectoryString; - return this; - } - - public QFileBuilder setWarehouseDirectoryString(String warehouseDirectoryString) { - this.warehouseDirectoryString = warehouseDirectoryString; - return this; - } - - public QFileBuilder setHiveRootDirectory(File hiveRootDirectory) { - this.hiveRootDirectory = hiveRootDirectory; - return this; - } - - public QFile getQFile(String name) throws IOException { - QFile result = new QFile(); - result.name = name; - result.inputFile = new File(queryDirectory, name + ".q"); - result.rawOutputFile = new File(logDirectory, name + ".q.out.raw"); - result.outputFile = new File(logDirectory, name + ".q.out"); - result.expcetedOutputFile = new File(resultsDirectory, name + ".q.out"); - result.logFile = new File(logDirectory, name + ".q.beeline"); - result.infraLogFile = new File(logDirectory, name + ".q.out.infra"); - // These are the filters which are specific for the given QTest. - // Check staticFilterSet for common filters. - result.specificFilterSet = new RegexFilterSet() - .addFilter(scratchDirectoryString + "[\\w\\-/]+", "!!{hive.exec.scratchdir}!!") - .addFilter(warehouseDirectoryString, "!!{hive.metastore.warehouse.dir}!!") - .addFilter(resultsDirectory.getAbsolutePath(), "!!{expectedDirectory}!!") - .addFilter(logDirectory.getAbsolutePath(), "!!{outputDirectory}!!") - .addFilter(queryDirectory.getAbsolutePath(), "!!{qFileDirectory}!!") - .addFilter(hiveRootDirectory.getAbsolutePath(), "!!{hive.root}!!"); - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFileBeeLineClient.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFileBeeLineClient.java b/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFileBeeLineClient.java deleted file mode 100644 index b6eac89..0000000 --- a/itests/util/src/main/java/org/apache/hive/beeline/qfile/QFileBeeLineClient.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.beeline.qfile; - -import org.apache.hive.beeline.BeeLine; - -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; - -/** - * QFile test client using BeeLine. It can be used to submit a list of command strings, or a QFile. - */ -public class QFileBeeLineClient implements AutoCloseable { - private BeeLine beeLine; - private PrintStream beelineOutputStream; - private File logFile; - - protected QFileBeeLineClient(String jdbcUrl, String jdbcDriver, String username, String password, - File log) throws IOException { - logFile = log; - beeLine = new BeeLine(); - beelineOutputStream = new PrintStream(logFile, "UTF-8"); - beeLine.setOutputStream(beelineOutputStream); - beeLine.setErrorStream(beelineOutputStream); - beeLine.runCommands( - new String[] { - "!set verbose true", - "!set shownestederrs true", - "!set showwarnings true", - "!set showelapsedtime false", - "!set maxwidth -1", - "!connect " + jdbcUrl + " " + username + " " + password + " " + jdbcDriver - }); - } - - public boolean execute(String[] commands, File resultFile) { - boolean hasErrors = false; - beeLine.runCommands( - new String[] { - "!set outputformat csv", - "!record " + resultFile.getAbsolutePath() - }); - - if (commands.length != beeLine.runCommands(commands)) { - hasErrors = true; - } - - beeLine.runCommands(new String[] {"!record"}); - return !hasErrors; - } - - private void beforeExecute(QFile qFile) { - assert(execute( - new String[] { - "USE default;", - "SHOW TABLES;", - "DROP DATABASE IF EXISTS `" + qFile.getName() + "` CASCADE;", - "CREATE DATABASE `" + qFile.getName() + "`;", - "USE `" + qFile.getName() + "`;" - }, - qFile.getInfraLogFile())); - } - - private void afterExecute(QFile qFile) { - assert(execute( - new String[] { - "USE default;", - "DROP DATABASE IF EXISTS `" + qFile.getName() + "` CASCADE;", - }, - qFile.getInfraLogFile())); - } - - public boolean execute(QFile qFile) { - beforeExecute(qFile); - boolean result = execute( - new String[] { - "!run " + qFile.getInputFile().getAbsolutePath() - }, - qFile.getRawOutputFile()); - afterExecute(qFile); - return result; - } - - public void close() { - if (beeLine != null) { - beeLine.runCommands(new String[] { - "!quit" - }); - } - if (beelineOutputStream != null) { - beelineOutputStream.close(); - } - } - - /** - * Builder to generated QFileBeeLineClient objects. The after initializing the builder, it can be - * used to create new clients without any parameters. - */ - public static class QFileClientBuilder { - private String username; - private String password; - private String jdbcUrl; - private String jdbcDriver; - - public QFileClientBuilder() { - } - - public QFileClientBuilder setUsername(String username) { - this.username = username; - return this; - } - - public QFileClientBuilder setPassword(String password) { - this.password = password; - return this; - } - - public QFileClientBuilder setJdbcUrl(String jdbcUrl) { - this.jdbcUrl = jdbcUrl; - return this; - } - - public QFileClientBuilder setJdbcDriver(String jdbcDriver) { - this.jdbcDriver = jdbcDriver; - return this; - } - - public QFileBeeLineClient getClient(File logFile) throws IOException { - return new QFileBeeLineClient(jdbcUrl, jdbcDriver, username, password, logFile); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/util/src/main/java/org/apache/hive/beeline/qfile/package-info.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/beeline/qfile/package-info.java b/itests/util/src/main/java/org/apache/hive/beeline/qfile/package-info.java deleted file mode 100644 index fcd50ec..0000000 --- a/itests/util/src/main/java/org/apache/hive/beeline/qfile/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package for the BeeLine specific QTest file classes. - */ -package org.apache.hive.beeline.qfile; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/jdbc-handler/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc-handler/pom.xml b/jdbc-handler/pom.xml index 364886a..6c6e1fa 100644 --- a/jdbc-handler/pom.xml +++ b/jdbc-handler/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 8adf67b..1294a61 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -168,6 +168,18 @@ </includes> </filter> <filter> + <artifact>org.apache.parquet:parquet-hadoop-bundle</artifact> + <excludes> + <exclude>shaded/parquet/org/codehaus/jackson/**</exclude> + </excludes> + </filter> + <filter> + <artifact>org.apache.logging.log4j:log4j-core</artifact> + <excludes> + <exclude>org/apache/logging/log4j/core/jackson/**</exclude> + </excludes> + </filter> + <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> @@ -180,11 +192,7 @@ <artifactSet> <excludes> <exclude>org.apache.commons:commons-compress</exclude> - <exclude>org.apache.hadoop:hadoop-yarn*</exclude> - <exclude>org.apache.hadoop:hadoop-mapreduce*</exclude> - <exclude>org.apache.hadoop:hadoop-hdfs</exclude> - <exclude>org.apache.hadoop:hadoop-client</exclude> - <exclude>org.apache.hadoop:hadoop-annotations</exclude> + <exclude>org.apache.hadoop:*</exclude> <exclude>org.apache.hive:hive-vector-code-gen</exclude> <exclude>org.apache.ant:*</exclude> <exclude>junit:*</exclude> @@ -229,6 +237,7 @@ <exclude>com.thoughtworks.paranamer:*</exclude> <exclude>com.twitter:*</exclude> <exclude>com.zaxxer:*</exclude> + <exclude>com.fasterxml.jackson.core:*</exclude> <exclude>io.netty:*</exclude> <exclude>javax.activation:*</exclude> <exclude>javax.inject:*</exclude> @@ -292,13 +301,6 @@ <shadedPattern>org.apache.hive.com.facebook</shadedPattern> </relocation> <relocation> - <pattern>org.apache.hadoop</pattern> - <shadedPattern>org.apache.hive.org.apache.hadoop</shadedPattern> - <excludes> - <exclude>org.apache.hadoop.security.*</exclude> - </excludes> - </relocation> - <relocation> <pattern>org.apache.zookeeper</pattern> <shadedPattern>org.apache.hive.org.apache.zookeeper</shadedPattern> </relocation> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 1695c5d..fb18adb 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -41,6 +41,7 @@ import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.client.CookieStore; +import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.ServiceUnavailableRetryStrategy; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; @@ -386,9 +387,9 @@ public class HiveConnection implements java.sql.Connection { * Add an interceptor to pass username/password in the header. * In https mode, the entire information is encrypted */ - requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword(), - cookieStore, cookieName, useSsl, - additionalHttpHeaders); + requestInterceptor = + new HttpBasicAuthInterceptor(getUserName(), getPassword(), cookieStore, cookieName, + useSsl, additionalHttpHeaders); } } // Configure http client for cookie based authentication @@ -421,6 +422,23 @@ public class HiveConnection implements java.sql.Connection { } else { httpClientBuilder = HttpClientBuilder.create(); } + // In case the server's idletimeout is set to a lower value, it might close it's side of + // connection. However we retry one more time on NoHttpResponseException + httpClientBuilder.setRetryHandler(new HttpRequestRetryHandler() { + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + if (executionCount > 1) { + LOG.info("Retry attempts to connect to server exceeded."); + return false; + } + if (exception instanceof org.apache.http.NoHttpResponseException) { + LOG.info("Could not connect to the server. Retrying one more time."); + return true; + } + return false; + } + }); + // Add the request interceptor to the client builder httpClientBuilder.addInterceptorFirst(requestInterceptor); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a0aea72..c385e2c 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -315,9 +315,11 @@ public class HiveStatement implements java.sql.Statement { isExecuteStatementFailed = false; } catch (SQLException eS) { isExecuteStatementFailed = true; + isLogBeingGenerated = false; throw eS; } catch (Exception ex) { isExecuteStatementFailed = true; + isLogBeingGenerated = false; throw new SQLException(ex.toString(), "08S01", ex); } } @@ -914,10 +916,6 @@ public class HiveStatement implements java.sql.Statement { if (isQueryClosed) { throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " + "statement has been closed or cancelled."); - } - if (isExecuteStatementFailed) { - throw new SQLException("Method getQueryLog() failed. Because the stmtHandle in " + - "HiveStatement is null and the statement execution might fail."); } else { return logs; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java index d4cd79c..90b7368 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java +++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hive.jdbc.logs; import org.apache.hive.service.rpc.thrift.TProgressUpdateResp; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 3bacd2b..aa2cf32 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index e5ab601..42129b7 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -25,4 +25,5 @@ import org.apache.hadoop.mapred.InputFormat; public interface LlapIo<T> { InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde); void close(); + String getMemoryInfo(); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 610c0a5..76fc9c7 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -61,7 +61,7 @@ public class LlapRegistryService extends AbstractService { if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. String appName = hosts.substring(1); - String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, currentUser()); String key = appName + "-" + userName; registry = yarnRegistries.get(key); if (registry == null || !registry.isInState(STATE.STARTED)) { @@ -79,6 +79,9 @@ public class LlapRegistryService extends AbstractService { return registry; } + public static String currentUser() { + return RegistryUtils.currentUser(); + } @Override public void serviceInit(Configuration conf) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-common/pom.xml ---------------------------------------------------------------------- diff --git a/llap-common/pom.xml b/llap-common/pom.xml index 334fd75..bb40996 100644 --- a/llap-common/pom.xml +++ b/llap-common/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java new file mode 100644 index 0000000..fa29b59 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.yarn.api.records.Resource; + +public enum LlapDaemonInfo { + INSTANCE; + + private static final class LlapDaemonInfoHolder { + public LlapDaemonInfoHolder(int numExecutors, long executorMemory, long cacheSize, + boolean isDirectCache, boolean isLlapIo) { + this.numExecutors = numExecutors; + this.executorMemory = executorMemory; + this.cacheSize = cacheSize; + this.isDirectCache = isDirectCache; + this.isLlapIo = isLlapIo; + } + + final int numExecutors; + final long executorMemory; + final long cacheSize; + final boolean isDirectCache; + final boolean isLlapIo; + } + + // add more variables as required + private AtomicReference<LlapDaemonInfoHolder> dataRef = + new AtomicReference<LlapDaemonInfoHolder>(); + + public static void initialize(String appName, Configuration daemonConf) { + int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + long executorMemoryBytes = + HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; + long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); + boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + initialize(appName, numExecutors, executorMemoryBytes, ioMemoryBytes, isDirectCache, isLlapIo); + } + + public static void initialize(String appName, int numExecutors, long executorMemoryBytes, + long ioMemoryBytes, boolean isDirectCache, boolean isLlapIo) { + INSTANCE.dataRef.set(new LlapDaemonInfoHolder(numExecutors, executorMemoryBytes, ioMemoryBytes, + isDirectCache, isLlapIo)); + } + + public boolean isLlap() { + return dataRef.get() != null; + } + + public int getNumExecutors() { + return dataRef.get().numExecutors; + } + + public long getExecutorMemory() { + return dataRef.get().executorMemory; + } + + public long getMemoryPerExecutor() { + final LlapDaemonInfoHolder data = dataRef.get(); + return (getExecutorMemory() - -(data.isDirectCache ? 0 : data.cacheSize)) / getNumExecutors(); + } + + public long getCacheSize() { + return dataRef.get().cacheSize; + } + + public boolean isDirectCache() { + return dataRef.get().isDirectCache; + } + + public boolean isLlapIo() { + return dataRef.get().isLlapIo; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-ext-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml index 5ba0ec5..d9ea026 100644 --- a/llap-ext-client/pom.xml +++ b/llap-ext-client/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/bin/runLlapDaemon.sh ---------------------------------------------------------------------- diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh index 001e304..82c2cc5 100755 --- a/llap-server/bin/runLlapDaemon.sh +++ b/llap-server/bin/runLlapDaemon.sh @@ -51,7 +51,7 @@ shift JAVA=$JAVA_HOME/bin/java LOG_LEVEL_DEFAULT="INFO" LOGGER_DEFAULT="console" -JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=100M -XX:+PrintGCDateStamps" +JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=100M -XX:+PrintGCDateStamps" if [ ! -d "${LLAP_DAEMON_HOME}" ]; then echo No LLAP_DAEMON_HOME set, or is not a directory. @@ -107,7 +107,7 @@ elif [ "$COMMAND" = "run" ] ; then CLASS='org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon' fi -JAVA_OPTS_BASE="${JAVA_OPTS_BASE} -Xloggc:${LLAP_DAEMON_LOG_DIR}/gc.log" +JAVA_OPTS_BASE="${JAVA_OPTS_BASE} -Xloggc:${LLAP_DAEMON_LOG_DIR}/gc_$(date +%Y-%m-%d-%H).log" LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} ${JAVA_OPTS_BASE}" # Set the default GC option if none set http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/pom.xml ---------------------------------------------------------------------- diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 630e243..b10f05f 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -178,6 +178,10 @@ <version>${slider.version}</version> <exclusions> <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> @@ -222,8 +226,8 @@ <artifactId>jettison</artifactId> </exclusion> <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java index ff6e7ce..6cf8dbb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java @@ -116,7 +116,7 @@ public class IncrementalObjectSizeEstimator { addToProcessing(byType, stack, fieldObj, fieldClass); } } - estimator.directSize = JavaDataModel.alignUp( + estimator.directSize = (int) JavaDataModel.alignUp( estimator.directSize, memoryModel.memoryAlign()); } } @@ -454,7 +454,7 @@ public class IncrementalObjectSizeEstimator { if (len != 0) { int elementSize = getPrimitiveSize(e.field.getType().getComponentType()); arraySize += elementSize * len; - arraySize = JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); + arraySize = (int) JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); } referencedSize += arraySize; break; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 8d7f0d3..302918a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicLong; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -43,11 +45,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; -public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAllocatorMXBean { +public final class BuddyAllocator + implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); private final MemoryManager memoryManager; + private static final long MAX_DUMP_INTERVAL_NS = 300 * 1000000000L; // 5 minutes. + private final AtomicLong lastLog = new AtomicLong(-1); // Config settings private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas; @@ -119,13 +124,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca } else { cacheDir = null; } - int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount); + long arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : maxSizeVal / arenaCount; + // The math.min, and the fact that maxAllocation is an int, ensures we don't overflow. arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE)); if (LlapIoImpl.LOG.isInfoEnabled()) { - LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers;" - + (isMapped ? (" memory mapped off " + cacheDir.toString() + "; ") : "") + LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers; " + + (isMapped ? ("memory mapped off " + cacheDir.toString() + "; ") : "") + "allocation sizes " + minAllocation + " - " + maxAllocation - + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal); + + ", arena size " + arenaSizeVal + ", total size " + maxSizeVal); } String minName = ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, @@ -148,7 +154,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize + " to be divisible by allocation size " + maxAllocation); } - arenaSize = arenaSizeVal; + arenaSize = (int)arenaSizeVal; if ((maxSizeVal % arenaSize) > 0) { long oldMaxSize = maxSizeVal; maxSizeVal = (maxSizeVal / arenaSize) * arenaSize; @@ -191,8 +197,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca int allocLog2 = freeListIx + minAllocLog2; int allocationSize = 1 << allocLog2; // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave? - memoryManager.reserveMemory(dest.length << allocLog2, true); - + memoryManager.reserveMemory(dest.length << allocLog2); int destAllocIx = 0; for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; @@ -241,38 +246,106 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca // into some sort of queues that deallocate and split will examine), or having and "actor" // allocator thread (or threads per arena). // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code. - // But for now we will just retry 5 times 0_o - for (int attempt = 0; attempt < 5; ++attempt) { - // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once - { - int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx; - do { - int newDestIx = arenas[arenaIx].allocateWithSplit( - arenaIx, freeListIx, dest, destAllocIx, allocationSize); - if (newDestIx == dest.length) return; - assert newDestIx != -1; - destAllocIx = newDestIx; - if ((++arenaIx) == arenaCount) { - arenaIx = 0; + // But for now we will just retry. We will evict more each time. + long forceReserved = 0; + int attempt = 0; + try { + while (true) { + // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once + { + int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx; + do { + int newDestIx = arenas[arenaIx].allocateWithSplit( + arenaIx, freeListIx, dest, destAllocIx, allocationSize); + if (newDestIx == dest.length) return; + assert newDestIx != -1; + destAllocIx = newDestIx; + if ((++arenaIx) == arenaCount) { + arenaIx = 0; + } + } while (arenaIx != startArenaIx); + } + + if (attempt == 0) { + // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. + for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) { + destAllocIx = arenas[arenaIx].allocateWithExpand( + arenaIx, freeListIx, dest, destAllocIx, allocationSize); + if (destAllocIx == dest.length) return; } - } while (arenaIx != startArenaIx); + } + int numberToForce = (dest.length - destAllocIx) * (attempt + 1); + long newReserved = memoryManager.forceReservedMemory(allocationSize, numberToForce); + forceReserved += newReserved; + if (newReserved == 0) { + // Cannot force-evict anything, give up. + String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + + dest.length + " (entire cache is fragmented and locked, or an internal issue)"; + logOomErrorMessage(msg); + throw new AllocatorOutOfMemoryException(msg); + } + if (attempt == 0) { + LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry"); + } + ++attempt; + } + } finally { + if (attempt > 4) { + LlapIoImpl.LOG.warn("Allocation of " + dest.length + " buffers of size " + size + + " took " + attempt + " attempts to evict enough memory"); + } + // After we succeed (or fail), release the force-evicted memory to memory manager. We have + // previously reserved enough to allocate all we need, so we don't take our allocation out + // of this - as per the comment above, we basically just wasted a bunch of cache (and CPU). + if (forceReserved > 0) { + memoryManager.releaseMemory(forceReserved); } + } + } - if (attempt == 0) { - // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. - for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) { - destAllocIx = arenas[arenaIx].allocateWithExpand( - arenaIx, freeListIx, dest, destAllocIx, allocationSize); - if (destAllocIx == dest.length) return; - } + private void logOomErrorMessage(String msg) { + while (true) { + long time = System.nanoTime(); + long lastTime = lastLog.get(); + // Magic value usage is invalid with nanoTime, so once in a 1000 years we may log extra. + boolean shouldLog = (lastTime == -1 || (time - lastTime) > MAX_DUMP_INTERVAL_NS); + if (shouldLog && !lastLog.compareAndSet(lastTime, time)) { + continue; } - memoryManager.forceReservedMemory(allocationSize, dest.length - destAllocIx); - LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt); + if (shouldLog) { + LlapIoImpl.LOG.error(msg + debugDumpForOom()); + } else { + LlapIoImpl.LOG.error(msg); + } + return; } - String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length; - LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump() - + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom()); - throw new AllocatorOutOfMemoryException(msg); + } + + /** + * Arbitrarily, we start getting the state from Allocator. Allocator calls MM which calls + * the policies that call the eviction dispatcher that calls the caches. See init - these all + * are connected in a cycle, so we need to make sure the who-calls-whom order is definite. + */ + @Override + public void debugDumpShort(StringBuilder sb) { + memoryManager.debugDumpShort(sb); + sb.append("\nAllocator state:"); + int unallocCount = 0, fullCount = 0; + long totalFree = 0; + for (Arena arena : arenas) { + Integer result = arena.debugDumpShort(sb); + if (result == null) { + ++unallocCount; + } else if (result == 0) { + ++fullCount; + } else { + totalFree += result; + } + } + sb.append("\nTotal available and allocated: ").append(totalFree).append( + "; unallocated arenas: ").append(unallocCount).append( + "; full arenas ").append(fullCount); + sb.append("\n"); } @Override @@ -299,7 +372,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca return isDirect; } - public String debugDump() { + public String debugDumpForOomInternal() { StringBuilder result = new StringBuilder( "NOTE: with multiple threads the dump is not guaranteed to be consistent"); for (Arena arena : arenas) { @@ -396,6 +469,36 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca } } + public Integer debugDumpShort(StringBuilder result) { + if (data == null) { + return null; + } + int allocSize = minAllocation; + int total = 0; + for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) { + FreeList freeList = freeLists[i]; + freeList.lock.lock(); + try { + int nextHeaderIx = freeList.listHead; + int count = 0; + while (nextHeaderIx >= 0) { + ++count; + nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx)); + } + if (count > 0) { + if (total == 0) { + result.append("\nArena with free list lengths by size: "); + } + total += (allocSize * count); + result.append(allocSize).append(" => ").append(count).append(", "); + } + } finally { + freeList.lock.unlock(); + } + } + return total; + } + public void debugDump(StringBuilder result) { result.append("\nArena: "); if (data == null) { @@ -677,4 +780,10 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca public MemoryBuffer createUnallocated() { return new LlapDataBuffer(); } + + @Override + public String debugDumpForOom() { + return "\nALLOCATOR STATE:\n" + debugDumpForOomInternal() + + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java index a6b0abd..c73f1a1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; /** * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches. */ -public final class EvictionDispatcher implements EvictionListener { +public final class EvictionDispatcher implements EvictionListener, LlapOomDebugDump { private final LowLevelCache dataCache; private final SerDeLowLevelCacheImpl serdeCache; private final OrcMetadataCache metadataCache; @@ -65,4 +65,27 @@ public final class EvictionDispatcher implements EvictionListener { public void notifyEvicted(OrcFileEstimateErrors buffer) { metadataCache.notifyEvicted(buffer); } + + @Override + public String debugDumpForOom() { + StringBuilder sb = new StringBuilder(dataCache.debugDumpForOom()); + if (serdeCache != null) { + sb.append(serdeCache.debugDumpForOom()); + } + if (metadataCache != null) { + sb.append(metadataCache.debugDumpForOom()); + } + return sb.toString(); + } + + @Override + public void debugDumpShort(StringBuilder sb) { + dataCache.debugDumpShort(sb); + if (serdeCache != null) { + serdeCache.debugDumpShort(sb); + } + if (metadataCache != null) { + metadataCache.debugDumpShort(sb); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java index 30bf5a9..e861a7e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java @@ -20,4 +20,5 @@ package org.apache.hadoop.hive.llap.cache; public interface LlapOomDebugDump { String debugDumpForOom(); + void debugDumpShort(StringBuilder sb); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java index 19c589a..c5d0c84 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; -public interface LowLevelCache { +public interface LowLevelCache extends LlapOomDebugDump { public enum Priority { NORMAL, HIGH http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index 4dc1c23..23796f6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -466,4 +466,43 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla } return sb.toString(); } + + @Override + public void debugDumpShort(StringBuilder sb) { + sb.append("\nORC cache state "); + int allLocked = 0, allUnlocked = 0, allEvicted = 0; + for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e : + cache.entrySet()) { + if (!e.getValue().incRef()) continue; + try { + int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0; + if (e.getValue().getCache().isEmpty()) continue; + for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) { + int newRc = e2.getValue().incRef(); + if (newRc < 0) { + ++fileEvicted; + continue; + } + try { + if (newRc > 1) { // We hold one refcount. + ++fileLocked; + } else { + ++fileUnlocked; + } + } finally { + e2.getValue().decRef(); + } + } + allLocked += fileLocked; + allUnlocked += fileUnlocked; + allEvicted += fileEvicted; + sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + + fileUnlocked + " unlocked, " + fileEvicted + " evicted"); + } finally { + e.getValue().decRef(); + } + } + sb.append("\nORC cache summary: " + allLocked + " locked, " + + allUnlocked + " unlocked, " + allEvicted + " evicted"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 88bfa8b..2132574 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -38,25 +38,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager { private long maxSize; public LowLevelCacheMemoryManager( - Configuration conf, LowLevelCachePolicy evictor, LlapDaemonCacheMetrics metrics) { - this(HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE), evictor, metrics); - } - - @VisibleForTesting - public LowLevelCacheMemoryManager( long maxSize, LowLevelCachePolicy evictor, LlapDaemonCacheMetrics metrics) { this.maxSize = maxSize; this.evictor = evictor; this.usedMemory = new AtomicLong(0); this.metrics = metrics; - metrics.setCacheCapacityTotal(maxSize); if (LlapIoImpl.LOG.isInfoEnabled()) { LlapIoImpl.LOG.info("Memory manager initialized with max size {} and" + " {} ability to evict blocks", maxSize, ((evictor == null) ? "no " : "")); } } + @Override + public void reserveMemory(final long memoryToReserve) { + boolean result = reserveMemory(memoryToReserve, true); + if (result) return; + // Can only happen if there's no evictor, or if thread is interrupted. + throw new RuntimeException("Cannot reserve memory" + + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")); + } + + @VisibleForTesting public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; @@ -108,19 +111,19 @@ public class LowLevelCacheMemoryManager implements MemoryManager { usedMem = usedMemory.get(); } } + if (!result) { + releaseMemory(reservedTotalMetric); + reservedTotalMetric = 0; + } metrics.incrCacheCapacityUsed(reservedTotalMetric - evictedTotalMetric); return result; } @Override - public void forceReservedMemory(int allocationSize, int count) { - if (evictor == null) return; - while (count > 0) { - int evictedCount = evictor.tryEvictContiguousData(allocationSize, count); - if (evictedCount == 0) return; - count -= evictedCount; - } + public long forceReservedMemory(int allocationSize, int count) { + if (evictor == null) return 0; + return evictor.tryEvictContiguousData(allocationSize, count); } @Override @@ -136,7 +139,13 @@ public class LowLevelCacheMemoryManager implements MemoryManager { @Override public String debugDumpForOom() { if (evictor == null) return null; - return "cache state\n" + evictor.debugDumpForOom(); + return "\ncache state\n" + evictor.debugDumpForOom(); + } + + @Override + public void debugDumpShort(StringBuilder sb) { + if (evictor == null) return; + evictor.debugDumpShort(sb); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java index bb1d1b0..fd9d942 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java @@ -28,5 +28,5 @@ public interface LowLevelCachePolicy extends LlapOomDebugDump { void setEvictionListener(EvictionListener listener); void setParentDebugDumper(LlapOomDebugDump dumper); /** TODO: temporary method until we have a better allocator */ - int tryEvictContiguousData(int allocationSize, int count); + long tryEvictContiguousData(int allocationSize, int count); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java index 430a5f8..761fd00 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java @@ -34,7 +34,7 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy { private EvictionListener evictionListener; private LlapOomDebugDump parentDebugDump; - public LowLevelFifoCachePolicy(Configuration conf) { + public LowLevelFifoCachePolicy() { LlapIoImpl.LOG.info("FIFO cache policy"); buffers = new LinkedList<LlapCacheableBuffer>(); } @@ -116,10 +116,26 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy { } @Override - public int tryEvictContiguousData(int allocationSize, int count) { + public void debugDumpShort(StringBuilder sb) { + sb.append("\nFIFO eviction list: "); + lock.lock(); + try { + sb.append(buffers.size()).append(" elements)"); + } finally { + lock.unlock(); + } + if (parentDebugDump != null) { + parentDebugDump.debugDumpShort(sb); + } + } + + @Override + public long tryEvictContiguousData(int allocationSize, int count) { long evicted = evictInternal(allocationSize * count, allocationSize); - // This makes granularity assumptions. - assert evicted % allocationSize == 0; - return (int)(evicted / allocationSize); + int remainingCount = count - (int)(evicted / allocationSize); + if (remainingCount > 0) { + evicted += evictInternal(allocationSize * remainingCount, -1); + } + return evicted; } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 4cd2c18..3973c8a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -69,12 +69,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { private EvictionListener evictionListener; private LlapOomDebugDump parentDebugDump; - public LowLevelLrfuCachePolicy(Configuration conf) { - this((int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), - HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE), conf); - } - - @VisibleForTesting public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize); @@ -210,13 +204,14 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { } @Override - public int tryEvictContiguousData(int allocationSize, int count) { + public long tryEvictContiguousData(int allocationSize, int count) { int evicted = evictDataFromList(allocationSize, count); - count -= evicted; - if (count > 0) { - evicted += evictDataFromHeap(timer.get(), count, allocationSize); - } - return evicted; + if (count <= evicted) return evicted * allocationSize; + evicted += evictDataFromHeap(timer.get(), count - evicted, allocationSize); + long evictedBytes = evicted * allocationSize; + if (count <= evicted) return evictedBytes; + evictedBytes += evictSomeBlocks(allocationSize * (count - evicted)); + return evictedBytes; } private long evictFromList(long memoryToReserve) { @@ -573,4 +568,26 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { } return result; } + + @Override + public void debugDumpShort(StringBuilder sb) { + sb.append("\nLRFU eviction list: "); + LlapCacheableBuffer listHeadLocal = listHead, listTailLocal = listTail; + if (listHeadLocal == null) { + sb.append("0 items"); + } else { + LlapCacheableBuffer listItem = listHeadLocal; + int c = 0; + while (listItem != null) { + ++c; + if (listItem == listTailLocal) break; + listItem = listItem.next; + } + sb.append(c + " items"); + } + sb.append("\nLRFU eviction heap: " + heapSize + " items"); + if (parentDebugDump != null) { + parentDebugDump.debugDumpShort(sb); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index ca41825..0f4d3c0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hive.llap.cache; public interface MemoryManager extends LlapOomDebugDump { - boolean reserveMemory(long memoryToReserve, boolean waitForEviction); void releaseMemory(long memUsage); void updateMaxSize(long maxSize); /** TODO: temporary method until we get a better allocator. */ - void forceReservedMemory(int allocationSize, int count); + long forceReservedMemory(int allocationSize, int count); + void reserveMemory(long memoryToReserve); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java index 4809398..cd5bc9b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -44,7 +43,7 @@ import org.apache.orc.OrcProto.ColumnEncoding; import com.google.common.base.Function; -public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump { +public class SerDeLowLevelCacheImpl implements LlapOomDebugDump { private static final int DEFAULT_CLEANUP_INTERVAL = 600; private final Allocator allocator; private final AtomicInteger newEvictions = new AtomicInteger(0); @@ -617,18 +616,6 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD } } - @Override - public void decRefBuffer(MemoryBuffer buffer) { - unlockBuffer((LlapDataBuffer)buffer, true); - } - - @Override - public void decRefBuffers(List<MemoryBuffer> cacheBuffers) { - for (MemoryBuffer b : cacheBuffers) { - unlockBuffer((LlapDataBuffer)b, true); - } - } - private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) { boolean isLastDecref = (buffer.decRef() == 0); if (handleLastDecRef && isLastDecref) { @@ -704,18 +691,6 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD } @Override - public boolean incRefBuffer(MemoryBuffer buffer) { - // notifyReused implies that buffer is already locked; it's also called once for new - // buffers that are not cached yet. Don't notify cache policy. - return lockBuffer(((LlapDataBuffer)buffer), false); - } - - @Override - public Allocator getAllocator() { - return allocator; - } - - @Override public String debugDumpForOom() { StringBuilder sb = new StringBuilder("File cache state "); for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) { @@ -731,4 +706,55 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD } return sb.toString(); } + + + @Override + public void debugDumpShort(StringBuilder sb) { + sb.append("\nSerDe cache state "); + int allLocked = 0, allUnlocked = 0, allEvicted = 0; + for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) { + if (!e.getValue().incRef()) continue; + try { + FileData fd = e.getValue().getCache(); + int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0; + sb.append(fd.colCount).append(" columns, ").append(fd.stripes.size()).append(" stripes; "); + for (StripeData stripe : fd.stripes) { + if (stripe.data == null) continue; + for (int i = 0; i < stripe.data.length; ++i) { + LlapDataBuffer[][] colData = stripe.data[i]; + if (colData == null) continue; + for (int j = 0; j < colData.length; ++j) { + LlapDataBuffer[] streamData = colData[j]; + if (streamData == null) continue; + for (int k = 0; k < streamData.length; ++k) { + int newRc = streamData[k].incRef(); + if (newRc < 0) { + ++fileEvicted; + continue; + } + try { + if (newRc > 1) { // We hold one refcount. + ++fileLocked; + } else { + ++fileUnlocked; + } + } finally { + streamData[k].decRef(); + } + } + } + } + } + allLocked += fileLocked; + allUnlocked += fileUnlocked; + allEvicted += fileEvicted; + sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + + fileUnlocked + " unlocked, " + fileEvicted + " evicted"); + } finally { + e.getValue().decRef(); + } + } + sb.append("\nSerDe cache summary: " + allLocked + " locked, " + + allUnlocked + " unlocked, " + allEvicted + " evicted"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java index d8f59d1..51eb34e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java @@ -32,9 +32,10 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean { private final boolean isDirect; private static Field cleanerField; static { - ByteBuffer tmp = ByteBuffer.allocateDirect(1); try { - cleanerField = tmp.getClass().getDeclaredField("cleaner"); + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); cleanerField.setAccessible(true); } catch (Throwable t) { LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index d1eee04..af7cf3d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -102,4 +102,14 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache { public void notifyEvicted(MemoryBuffer buffer) { throw new UnsupportedOperationException("Buffer manager doesn't have cache"); } + + @Override + public String debugDumpForOom() { + return ""; + } + + @Override + public void debugDumpShort(StringBuilder sb) { + // TODO Auto-generated method stub + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 22e5ee8..a662c75 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -76,7 +76,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.rewrite.handler.Rule; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.joda.time.DateTime; import org.json.JSONException; import org.json.JSONObject; @@ -378,7 +379,8 @@ public class LlapServiceDriver { LlapTezUtils.class, // llap-tez LlapInputFormat.class, // llap-server HiveInputFormat.class, // hive-exec - SslSocketConnector.class, // hive-common (https deps) + SslContextFactory.class, // hive-common (https deps) + Rule.class, // Jetty rewrite class RegistryUtils.ServiceRecordMarshal.class, // ZK registry // log4j2 com.lmax.disruptor.RingBuffer.class, // disruptor http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java index 8342067..2d0121c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java @@ -24,7 +24,11 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.slider.api.types.ApplicationDiagnostics; import org.apache.slider.client.SliderClient; import org.apache.slider.common.params.ActionCreateArgs; import org.apache.slider.common.params.ActionDestroyArgs; @@ -58,6 +62,60 @@ public class LlapSliderUtils { return sliderClient; } + public static ApplicationReport getAppReport(String appName, SliderClient sliderClient, + long timeoutMs) throws + LlapStatusServiceDriver.LlapStatusCliException { + Clock clock = new SystemClock(); + long startTime = clock.getTime(); + long timeoutTime = timeoutMs < 0 ? Long.MAX_VALUE : (startTime + timeoutMs); + ApplicationReport appReport = null; + + while (appReport == null) { + try { + appReport = sliderClient.getYarnAppListClient().findInstance(appName); + if (timeoutMs == 0) { + // break immediately if timeout is 0 + break; + } + // Otherwise sleep, and try again. + if (appReport == null) { + long remainingTime = Math.min(timeoutTime - clock.getTime(), 500l); + if (remainingTime > 0) { + Thread.sleep(remainingTime); + } else { + break; + } + } + } catch (Exception e) { // No point separating IOException vs YarnException vs others + throw new LlapStatusServiceDriver.LlapStatusCliException( + LlapStatusServiceDriver.ExitCode.YARN_ERROR, + "Failed to get Yarn AppReport", e); + } + } + return appReport; + } + + public static ApplicationDiagnostics getApplicationDiagnosticsFromYarnDiagnostics( + ApplicationReport appReport, Logger LOG) { + if (appReport == null) { + return null; + } + String diagnostics = appReport.getDiagnostics(); + if (diagnostics == null || diagnostics.isEmpty()) { + return null; + } + try { + ApplicationDiagnostics appDiagnostics = + ApplicationDiagnostics.fromJson(diagnostics); + return appDiagnostics; + } catch (IOException e) { + LOG.warn( + "Failed to parse application diagnostics from Yarn Diagnostics - {}", + diagnostics); + return null; + } + } + public static void startCluster(Configuration conf, String name, String packageName, Path packageDir, String queue) { LOG.info("Starting cluster with " + name + ", " http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java index b4aa430..bd91495 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java @@ -38,6 +38,7 @@ public class LlapStatusOptionsProcessor { private static final long DEFAULT_STATUS_REFRESH_INTERVAL_MS = 1 * 1000l; // 1 seconds wait until subsequent status private static final long DEFAULT_WATCH_MODE_TIMEOUT_MS = 5 * 60 * 1000l; // 5 minutes timeout for watch mode private static final float DEFAULT_RUNNING_NODES_THRESHOLD = 1.0f; + enum OptionConstants { NAME("name", 'n', "LLAP cluster name", true),
