SourabhBadhya commented on code in PR #4060: URL: https://github.com/apache/hive/pull/4060#discussion_r1213935645
########## beeline/src/java/org/apache/hive/beeline/schematool/tasks/HiveUpdateTask.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.beeline.schematool.tasks; + +import com.google.common.collect.Sets; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.hadoop.hive.metastore.tools.schematool.SchemaToolCommandLine; +import org.apache.hadoop.hive.metastore.SchemaInfo; +import org.apache.hadoop.hive.metastore.tools.schematool.task.SchemaToolTask; +import org.apache.hadoop.hive.metastore.tools.schematool.task.TaskContext; +import org.apache.hive.beeline.schematool.HiveSchemaTool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; + +class HiveUpdateTask extends SchemaToolTask { + + private static final Logger LOG = LoggerFactory.getLogger(HiveUpdateTask.class); + + private static final String VERSION_UPGRADE_SCRIPT = + "USE SYS\n" + + "DROP TABLE IF EXISTS `VERSION`\n" + + "%s\n" + + "USE INFORMATION_SCHEMA\n" + + "DROP TABLE IF EXISTS `VERSION`\n" + + "%s\n" + + "SELECT 'Finished upgrading MetaStore schema to %s'\n"; + + @Override + protected Set<String> usedCommandLineArguments() { + return Sets.newHashSet("initSchemaTo", "dryRun"); + } + + @Override + protected void execute(TaskContext context) throws HiveMetaException { + String fromVersion = getFromVersion(context); + + SchemaInfo schemaInfo = context.getSchemaInfo(); + String toVersion = context.getCommandLine().getOptionValue("initSchemaTo"); + if (StringUtils.isBlank(toVersion)) { + toVersion = SchemaInfo.getRequiredHiveSchemaVersion(); + } + + if (toVersion.equals(fromVersion)) { + System.out.println("No schema upgrade required from version " + fromVersion); + return; + } + + if (fromVersion.equals(HiveSchemaInfo.INITIAL_VERSION)) { + System.out.println("Initializing schema"); + } else { + System.out.println("Starting upgrade metastore schema from version " + fromVersion + " to " + toVersion); + } + + // Find the list of scripts to execute for this upgrade + List<String> upgradeScripts = schemaInfo.getUnappliedScripts(); + + String scriptDir = schemaInfo.getMetaStoreScriptDir(); + try { + for (String scriptFile : upgradeScripts) { + System.out.println("Upgrade script " + scriptFile); + if (!context.getCommandLine().hasOption("dryRun")) { + context.getScriptExecutor().execSql(scriptDir, scriptFile); + System.out.println("Completed " + scriptFile); + } + } + + //Update schema version + File scriptFile = File.createTempFile("hiveVesionScript", "sql"); Review Comment: Typo: hiveVersionScript ########## beeline/src/java/org/apache/hive/beeline/schematool/tasks/HiveTaskProvider.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.beeline.schematool.tasks; + +import org.apache.hadoop.hive.metastore.tools.schematool.HiveSchemaHelper; +import org.apache.hadoop.hive.metastore.tools.schematool.task.SchemaToolTask; +import org.apache.hadoop.hive.metastore.tools.schematool.task.SchemaToolTaskProvider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +public class HiveTaskProvider implements SchemaToolTaskProvider { + + /** + * The map contains {@link Supplier} lambdas, so only the required {@link SchemaToolTask}s are instantiated. + */ + private final Map<String, Supplier<SchemaToolTask>> taskSuppliers = new HashMap<>(); + + @Override + public SchemaToolTask getTask(String command) { + return taskSuppliers.getOrDefault(command, () -> null).get(); + } + + @Override + public Set<String> getSupportedDatabases() { + return new HashSet<>(Collections.singletonList(HiveSchemaHelper.DB_HIVE)); + } + + public HiveTaskProvider(SchemaToolTaskProvider embeddedHmsTaskProvider) { + taskSuppliers.put("initSchema", () -> new HiveContextTask().addChild(new HiveUpdateTask().addChild(embeddedHmsTaskProvider.getTask("info")))); Review Comment: I think strings like "initSchema", "initSchemaTo" etc should be constants should be constants. Since the same string is used elsewhere. ########## beeline/src/java/org/apache/hive/beeline/schematool/tasks/HiveSchemaInfo.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.beeline.schematool.tasks; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.hadoop.hive.metastore.SchemaInfo; +import org.apache.hadoop.hive.metastore.tools.schematool.HiveSchemaHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +class HiveSchemaInfo extends SchemaInfo { + + private static final Logger LOG = LoggerFactory.getLogger(HiveSchemaInfo.class); + private static final String UPGRADE_FILE_PREFIX = "upgrade-"; + protected static final String VERSION_UPGRADE_LIST = "upgrade.order"; + protected static final String INITIAL_VERSION = "0.0.0"; + + String[] hiveSchemaVersions = null; + + @Override + public List<String> getUnappliedScripts() throws HiveMetaException { + if (hiveSchemaVersions == null) { + loadAllUpgradeScripts(); + } + String schemaVersion = INITIAL_VERSION; + try { + schemaVersion = getSchemaVersion(); + } catch (HiveMetaException e) { + if (e.getMessage().startsWith("Failed to get schema version")) { + LOG.warn("Unable to get schema version, assuming it's an empty schema."); + } else throw e; + } + List <String> upgradeScriptList = new ArrayList<>(); + + // check if we are already at the required schema level + if (isVersionCompatible(getRequiredHiveSchemaVersion(), schemaVersion)) { + return upgradeScriptList; + } + + // Find the list of scripts to execute for this upgrade + int firstScript = -1; + for (int i=0; i < hiveSchemaVersions.length; i++) { + if (hiveSchemaVersions[i].startsWith(schemaVersion + "-to-")) { + firstScript = i; + } + } + if (firstScript == -1) { + throw new HiveMetaException("Unknown version specified for upgrade " + schemaVersion + " Metastore schema may be too old or newer"); + } + + for (int i=firstScript; i < hiveSchemaVersions.length; i++) { + String scriptFile = generateUpgradeFileName(hiveSchemaVersions[i]); + upgradeScriptList.add(scriptFile); + } + return upgradeScriptList; + } + + @Override + public List<String> getAppliedScripts() throws HiveMetaException { + if (hiveSchemaVersions == null) { + loadAllUpgradeScripts(); + } + List <String> upgradeScriptList = new ArrayList<>(); + String schemaVersion; + try { + schemaVersion = getSchemaVersion(); + } catch (HiveMetaException e) { + if (e.getMessage().startsWith("Failed to get schema version")) { + LOG.warn("Unable to get schema version, assuming it's an empty schema."); + return upgradeScriptList; + } else throw e; + } + + // Find the list of scripts to execute for this upgrade + for (String hiveSchemaVersion : hiveSchemaVersions) { + if (hiveSchemaVersion.startsWith(schemaVersion + "-to-")) { + break; + } else { + String scriptFile = generateUpgradeFileName(hiveSchemaVersion); + upgradeScriptList.add(scriptFile); + } + } + return upgradeScriptList; + } + + @Override + public String getSchemaVersion() throws HiveMetaException { + try (Connection metastoreDbConnection = HiveSchemaHelper.getConnectionToMetastore(connectionInfo, conf, "SYS"); + Statement stmt = metastoreDbConnection.createStatement()) { + ResultSet res = stmt.executeQuery("select t.SCHEMA_VERSION from VERSION t"); + if (!res.next()) { + throw new HiveMetaException("Could not find version info in Hive VERSION table."); + } + String currentSchemaVersion = res.getString(1); + if (res.next()) { + throw new HiveMetaException("Multiple versions were found in version table."); + } + return currentSchemaVersion; + } catch (SQLException e) { + throw new HiveMetaException("Failed to get schema version, Cause:" + e.getMessage()); + } + } + + @Override + public String getCreateUserScript() throws HiveMetaException { + String createScript = CREATE_USER_PREFIX + "." + connectionInfo + SQL_FILE_EXTENSION; + File scriptFile = new File(getMetaStoreScriptDir() + File.separatorChar + createScript); + // check if the file exists + if (!scriptFile.exists()) { + throw new HiveMetaException("Unable to find create user file, expected: " + scriptFile.getAbsolutePath()); + } + return createScript; + } + + // format the upgrade script name eg upgrade-x-y-dbType.sql + private String generateUpgradeFileName(String fileVersion) { + return UPGRADE_FILE_PREFIX + fileVersion + "." + connectionInfo.getDbType() + SQL_FILE_EXTENSION; + } + + private void loadAllUpgradeScripts() throws HiveMetaException { + // load upgrade order for the given dbType + List<String> upgradeOrderList = new ArrayList<>(); + String upgradeListFile = getMetaStoreScriptDir() + File.separator + VERSION_UPGRADE_LIST + "." + connectionInfo.getDbType(); + try (FileReader fr = new FileReader(upgradeListFile); + BufferedReader bfReader = new BufferedReader(fr)) { + String currSchemaVersion; + while ((currSchemaVersion = bfReader.readLine()) != null) { + upgradeOrderList.add(currSchemaVersion.trim()); + } + } catch (FileNotFoundException e) { + throw new HiveMetaException("File " + upgradeListFile + " not found ", e); + } catch (IOException e) { + throw new HiveMetaException("Error reading " + upgradeListFile, e); + } + hiveSchemaVersions = upgradeOrderList.toArray(new String[0]); + } + + + public HiveSchemaInfo(String metastoreHome, HiveSchemaHelper.MetaStoreConnectionInfo connectionInfo, Configuration conf) throws HiveMetaException { Review Comment: Is HiveMetaException thrown here? I think this can be removed. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/commandparser/AbstractCommandParser.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.tools.schematool.commandparser; + +import com.google.common.collect.Lists; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.IllegalFormatException; +import java.util.List; + +/** + * Base implementation of NestedScriptParser abstractCommandParser. + */ +abstract class AbstractCommandParser implements NestedScriptParser { + + private List<String> dbOpts; + // Depending on whether we are using beeline or sqlline the line endings have to be handled + // differently. + private final boolean usingSqlLine; + + public AbstractCommandParser(String dbOpts, boolean usingSqlLine) { + setDbOpts(dbOpts); + this.usingSqlLine = usingSqlLine; + } + + @Override + public boolean isPartialCommand(String dbCommand) throws IllegalArgumentException{ + if (dbCommand == null || dbCommand.isEmpty()) { + throw new IllegalArgumentException("invalid command line " + dbCommand); + } + dbCommand = dbCommand.trim(); + if (dbCommand.endsWith(getDelimiter()) || isNonExecCommand(dbCommand)) { + return false; + } else { + return true; + } Review Comment: This can be simplified to - return !(dbCommand.endsWith(getDelimiter()) || isNonExecCommand(dbCommand)); ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/liquibase/LiquibaseUpdateToTask.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.tools.schematool.liquibase; + +import com.google.common.collect.Sets; +import liquibase.Contexts; +import liquibase.LabelExpression; +import liquibase.Labels; +import liquibase.Liquibase; +import liquibase.changelog.ChangeSetStatus; +import liquibase.exception.LiquibaseException; +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.apache.hadoop.hive.metastore.tools.schematool.task.SchemaToolTask; +import org.apache.hadoop.hive.metastore.tools.schematool.task.TaskContext; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This task utilizes {@link Liquibase} to init or upgrade the HMS schema up to a specific version. Since Liquibase + * doesn't maintain a database version, it is not possible to do this migration out of the box. In order to be able to + * run only a part of the migration scripts, labels are used. Each script has a version label, and the labels having + * less or equal version than 'initSchemaTo' are passed to {@link Liquibase#update(Contexts, LabelExpression)} as inlcusive + * filters. This requires that labels must be a parseable valid version + * (verified by {@link org.apache.hadoop.hive.metastore.SchemaInfo#isValidVersion(String)}). + */ +class LiquibaseUpdateToTask extends SchemaToolTask { + + @Override + protected Set<String> usedCommandLineArguments() { + return Sets.newHashSet("initSchemaTo", "dryRun"); + } + + @Override + protected void execute(TaskContext context) throws HiveMetaException { + try { + String toVersion = context.getCommandLine().getOptionValue("initSchemaTo"); + + Liquibase liquibase = context.getLiquibase(); + Contexts liquibaseContexts = context.getLiquibaseContext(); + + liquibase.getLog().info("Starting metastore schema upgrade to version: " + toVersion); + + List<ChangeSetStatus> unappliedChanges = liquibase + .getChangeSetStatuses(liquibaseContexts, null, false) + .stream() + .filter(s -> !s.getPreviouslyRan()) + .collect(Collectors.toList()); + + StringBuilder logEntry = new StringBuilder("The following scripts will be applied: "); + StringBuilder labelFilter = new StringBuilder(); + boolean foundToVersion = false; + for (int i = 0; i < unappliedChanges.size(); i++) { + logEntry.append(unappliedChanges.get(i).getChangeSet().getFilePath()); + + Labels currentVerison = unappliedChanges.get(i).getChangeSet().getLabels(); + + if (labelFilter.length() > 0) { + labelFilter.append(" OR "); + logEntry.append(", "); + } + + labelFilter.append(String.join(" OR ", currentVerison.getLabels())); + + if (currentVerison.getLabels().contains(toVersion)) { + foundToVersion = true; + break; + } + } + if (!foundToVersion) { + throw new HiveMetaException("The required version (" + toVersion + ") could not be found among the version scripts!"); + } + liquibase.getLog().info(logEntry.toString()); + + if (context.getCommandLine().hasOption("dryRun")) { + liquibase.update(liquibaseContexts, new LabelExpression(labelFilter.toString()), new PrintWriter(System.out)); + } else { + liquibase.update(liquibaseContexts, new LabelExpression(labelFilter.toString())); + } + + liquibase.getLog().info("Metastore schema upgraded to version: " + toVersion); + } catch (LiquibaseException e) { + throw new HiveMetaException("Schema upgrade FAILED! Metastore state would be inconsistent!", e); Review Comment: I see this string at multiple places. Ideal to add this as a constant. ########## beeline/src/java/org/apache/hive/beeline/schematool/tasks/BeelineScriptExecutor.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.beeline.schematool.tasks; + +import org.apache.commons.io.output.NullOutputStream; +import org.apache.hadoop.hive.metastore.tools.schematool.CommandBuilder; +import org.apache.hadoop.hive.metastore.tools.schematool.HiveSchemaHelper; +import org.apache.hadoop.hive.metastore.tools.schematool.commandparser.NestedScriptParser; +import org.apache.hadoop.hive.metastore.tools.schematool.scriptexecution.ScriptExecutor; +import org.apache.hive.beeline.BeeLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintStream; + +/** + * {@link BeeLine} based {@link ScriptExecutor} implementation. Able to execute the given scripts using {@link BeeLine} internally. + * Can be used to execute scripts against the Hive schema. + */ +class BeelineScriptExecutor implements ScriptExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(BeelineScriptExecutor.class); + private static final String LINE_SEPARATOR = "line.separator"; + + private final NestedScriptParser dbCommandParser; + private final CommandBuilder commandBuilder; + + @Override + public void execSql(String scriptDir, String sqlScriptFile) throws IOException { + // expand the nested script + // If the metaDbType is set, this is setting up the information + // schema in Hive. That specifically means that the sql commands need + // to be adjusted for the underlying RDBMS (correct quotation + // strings, etc). + String sqlCommands = dbCommandParser.buildCommand(scriptDir, sqlScriptFile, true); + File tmpFile = File.createTempFile("schematool", ".sql"); + tmpFile.deleteOnExit(); + + // write out the buffer into a file. Add beeline commands for autocommit and close + FileWriter fstream = new FileWriter(tmpFile.getPath()); + try (BufferedWriter out = new BufferedWriter(fstream)) { + if (!commandBuilder.getConnectionInfo().getDbType().equalsIgnoreCase(HiveSchemaHelper.DB_HIVE)) { + out.write("!autocommit off" + System.getProperty(LINE_SEPARATOR)); + out.write(sqlCommands); + out.write("!commit" + System.getProperty(LINE_SEPARATOR)); + } else { + out.write("!autocommit on" + System.getProperty(LINE_SEPARATOR)); + out.write(sqlCommands); + } + out.write("!closeall" + System.getProperty(LINE_SEPARATOR)); + } + execSql(tmpFile.getPath()); + } + + public void execSql(String sqlScriptFile) throws IOException { + // run the script using Beeline + try (BeeLine beeLine = new BeeLine()) { + if (!commandBuilder.isVerbose()) { + beeLine.setOutputStream(new PrintStream(new NullOutputStream())); Review Comment: Creating NullOutputStream is deprecated. Can we use the singleton offered by NullOutputStream. `beeLine.setOutputStream(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM));` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
