This is an automated email from the ASF dual-hosted git repository. jcamacho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 619826ae098c2bd6fc6bb01f753df9bbc301b4e5 Author: Jesus Camacho Rodriguez <[email protected]> AuthorDate: Thu Jul 11 20:51:07 2019 -0700 Revert "HIVE-21959: Clean up Concatenate and Msck DDL commands (Miklos Gergely, reviewed by Zoltan Haindrich)" This reverts commit 12712d5bdc70bd85e1d668b5aedf71cacb17c83f. --- .../apache/hadoop/hive/ql/ddl/misc/MsckDesc.java | 13 ++-- .../hadoop/hive/ql/ddl/misc/MsckOperation.java | 24 +------ .../table/storage/AlterTableConcatenateDesc.java | 68 +++++++++++------- .../storage/AlterTableConcatenateOperation.java | 83 +++++++++------------- .../apache/hadoop/hive/ql/exec/MapOperator.java | 4 +- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 13 ++-- .../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 2 +- .../hive/ql/exec/vector/VectorMapOperator.java | 5 +- .../hadoop/hive/ql/io/CombineHiveInputFormat.java | 14 ++-- .../hadoop/hive/ql/io/CombineHiveRecordReader.java | 3 +- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 21 ++++-- .../apache/hadoop/hive/ql/io/HiveInputFormat.java | 7 +- .../hadoop/hive/ql/io/SymbolicInputFormat.java | 4 +- .../hadoop/hive/ql/io/merge/MergeFileWork.java | 3 +- .../hive/ql/io/parquet/ProjectionPusher.java | 4 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 22 +++--- .../hadoop/hive/ql/optimizer/MapJoinProcessor.java | 6 +- .../physical/AbstractJoinTaskDispatcher.java | 8 ++- .../physical/CommonJoinTaskDispatcher.java | 4 +- .../optimizer/physical/NullScanTaskDispatcher.java | 6 +- .../physical/SortMergeJoinTaskDispatcher.java | 5 +- .../hive/ql/optimizer/physical/Vectorizer.java | 8 +-- .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 21 ++++-- .../hadoop/hive/ql/parse/MapReduceCompiler.java | 7 +- .../apache/hadoop/hive/ql/parse/TezCompiler.java | 2 +- .../hadoop/hive/ql/parse/spark/SparkCompiler.java | 2 +- .../ql/plan/ConditionalResolverCommonJoin.java | 16 +++-- .../ql/plan/ConditionalResolverMergeFiles.java | 4 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 49 ++++++------- .../hadoop/hive/ql/exec/TestGetInputSummary.java | 8 ++- .../apache/hadoop/hive/ql/exec/TestOperators.java | 4 +- .../org/apache/hadoop/hive/ql/exec/TestPlan.java | 4 +- .../apache/hadoop/hive/ql/exec/TestUtilities.java | 4 +- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 6 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 4 +- .../ql/plan/TestConditionalResolverCommonJoin.java | 4 +- .../apache/hadoop/hive/ql/plan/TestMapWork.java | 7 +- .../results/clientpositive/llap/orc_merge10.q.out | 2 +- .../results/clientpositive/llap/orc_merge6.q.out | 2 +- .../results/clientpositive/llap/orc_merge7.q.out | 2 +- .../clientpositive/llap/orc_merge_incompat2.q.out | 2 +- .../test/results/clientpositive/orc_merge10.q.out | 2 +- .../test/results/clientpositive/orc_merge6.q.out | 2 +- .../clientpositive/orc_merge_incompat2.q.out | 2 +- .../results/clientpositive/spark/orc_merge6.q.out | 2 +- .../results/clientpositive/spark/orc_merge7.q.out | 2 +- .../clientpositive/spark/orc_merge_incompat2.q.out | 2 +- .../org/apache/hadoop/hive/metastore/Msck.java | 10 ++- .../org/apache/hadoop/hive/metastore/MsckInfo.java | 70 +++++++++++++----- .../hive/metastore/PartitionManagementTask.java | 2 +- 50 files changed, 315 insertions(+), 256 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java index 4f6f31e..32a51fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.ddl.misc; import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -34,16 +36,19 @@ public class MsckDesc implements DDLDesc, Serializable { private static final long serialVersionUID = 1L; private final String tableName; - private final List<Map<String, String>> partitionsSpecs; + private final ArrayList<LinkedHashMap<String, String>> partitionsSpecs; private final String resFile; private final boolean repairPartitions; private final boolean addPartitions; private final boolean dropPartitions; - public MsckDesc(String tableName, List<Map<String, String>> partitionsSpecs, Path resFile, + public MsckDesc(String tableName, List<? extends Map<String, String>> partitionSpecs, Path resFile, boolean repairPartitions, boolean addPartitions, boolean dropPartitions) { this.tableName = tableName; - this.partitionsSpecs = partitionsSpecs; + this.partitionsSpecs = new ArrayList<LinkedHashMap<String, String>>(partitionSpecs.size()); + for (Map<String, String> partSpec : partitionSpecs) { + this.partitionsSpecs.add(new LinkedHashMap<>(partSpec)); + } this.resFile = resFile.toString(); this.repairPartitions = repairPartitions; this.addPartitions = addPartitions; @@ -56,7 +61,7 @@ public class MsckDesc implements DDLDesc, Serializable { } @Explain(displayName = "partitions specs", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public List<Map<String, String>> getPartitionsSpecs() { + public ArrayList<LinkedHashMap<String, String>> getPartitionsSpecs() { return partitionsSpecs; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java index ab8cf46..dea0a05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java @@ -23,19 +23,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import java.io.IOException; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.Msck; import org.apache.hadoop.hive.metastore.MsckInfo; -import org.apache.hadoop.hive.metastore.PartitionManagementTask; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.thrift.TException; /** * Operation process of metastore check. @@ -49,29 +43,15 @@ public class MsckOperation extends DDLOperation<MsckDesc> { } @Override - public int execute() throws HiveException, IOException, TException { + public int execute() throws HiveException, IOException { try { Msck msck = new Msck(false, false); msck.init(context.getDb().getConf()); String[] names = Utilities.getDbTableName(desc.getTableName()); - - long partitionExpirySeconds = -1L; - try (HiveMetaStoreClient msc = new HiveMetaStoreClient(context.getConf())) { - Table table = msc.getTable(SessionState.get().getCurrentCatalog(), names[0], names[1]); - String qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); - boolean msckEnablePartitionRetention = context.getConf().getBoolean( - MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false); - if (msckEnablePartitionRetention) { - partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(table); - LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", qualifiedTableName, - partitionExpirySeconds); - } - } - MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), names[0], names[1], desc.getPartitionsSpecs(), desc.getResFile(), desc.isRepairPartitions(), desc.isAddPartitions(), - desc.isDropPartitions(), partitionExpirySeconds); + desc.isDropPartitions(), -1); return msck.repair(msckInfo); } catch (MetaException e) { LOG.error("Unable to create msck instance.", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java index 281fcbf..64ce2fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; @@ -32,25 +34,20 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; * DDL task description for ALTER TABLE ... [PARTITION ... ] CONCATENATE commands. */ @Explain(displayName = "Concatenate", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -@SuppressWarnings("rawtypes") public class AlterTableConcatenateDesc implements DDLDesc { - private final String tableName; - private final Map<String, String> partitionSpec; - private final ListBucketingCtx lbCtx; - private final Path inputDir; - private final Path outputDir; - private final Class<? extends InputFormat> inputFormatClass; - private final TableDesc tableDesc; - - public AlterTableConcatenateDesc(String tableName, Map<String, String> partitionSpec, ListBucketingCtx lbCtx, - Path inputDir, Path outputDir, Class<? extends InputFormat> inputFormatClass, TableDesc tableDesc) { + private String tableName; + private Map<String, String> partSpec; + private ListBucketingCtx lbCtx; // context for list bucketing. + + private List<Path> inputDir = new ArrayList<Path>(); + private Path outputDir = null; + private Class<? extends InputFormat> inputFormatClass; + private TableDesc tableDesc; + + public AlterTableConcatenateDesc(String tableName, + Map<String, String> partSpec) { this.tableName = tableName; - this.partitionSpec = partitionSpec; - this.lbCtx = lbCtx; - this.inputDir = inputDir; - this.outputDir = outputDir; - this.inputFormatClass = inputFormatClass; - this.tableDesc = tableDesc; + this.partSpec = partSpec; } @Explain(displayName = "table name") @@ -58,28 +55,47 @@ public class AlterTableConcatenateDesc implements DDLDesc { return tableName; } - /** For Explain only. */ - @Explain(displayName = "partition spec") - public Map<String, String> getPartitionSpec() { - return partitionSpec; + @Explain(displayName = "partition desc") + public Map<String, String> getPartSpec() { + return partSpec; } - public ListBucketingCtx getLbCtx() { - return lbCtx; + public Path getOutputDir() { + return outputDir; } - public Path getInputDir() { + public void setOutputDir(Path outputDir) { + this.outputDir = outputDir; + } + + public List<Path> getInputDir() { return inputDir; } - public Path getOutputDir() { - return outputDir; + public void setInputDir(List<Path> inputDir) { + this.inputDir = inputDir; + } + + public ListBucketingCtx getLbCtx() { + return lbCtx; + } + + public void setLbCtx(ListBucketingCtx lbCtx) { + this.lbCtx = lbCtx; } public Class<? extends InputFormat> getInputFormatClass() { return inputFormatClass; } + public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) { + this.inputFormatClass = inputFormatClass; + } + + public void setTableDesc(TableDesc tableDesc) { + this.tableDesc = tableDesc; + } + public TableDesc getTableDesc() { return tableDesc; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java index 718c21d..0afc357 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import java.io.Serializable; +import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -43,8 +43,6 @@ import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc; import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc; import org.apache.hadoop.hive.ql.plan.TezWork; -import com.google.common.collect.Lists; - /** * Operation process of concatenating the files of a table/partition. */ @@ -55,48 +53,29 @@ public class AlterTableConcatenateOperation extends DDLOperation<AlterTableConca @Override public int execute() throws HiveException { - CompilationOpContext opContext = context.getDriverContext().getCtx().getOpContext(); - - MergeFileWork mergeWork = getMergeFileWork(opContext); - Task<?> task = getTask(mergeWork); - return executeTask(opContext, task); - } - - private MergeFileWork getMergeFileWork(CompilationOpContext opContext) { - List<Path> inputDirList = Lists.newArrayList(desc.getInputDir()); + ListBucketingCtx lbCtx = desc.getLbCtx(); + boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir(); + int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); // merge work only needs input and output. - MergeFileWork mergeWork = new MergeFileWork(inputDirList, desc.getOutputDir(), + MergeFileWork mergeWork = new MergeFileWork(desc.getInputDir(), desc.getOutputDir(), desc.getInputFormatClass().getName(), desc.getTableDesc()); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); + ArrayList<String> inputDirstr = new ArrayList<String>(1); + inputDirstr.add(desc.getInputDir().toString()); + pathToAliases.put(desc.getInputDir().get(0), inputDirstr); + mergeWork.setPathToAliases(pathToAliases); mergeWork.setListBucketingCtx(desc.getLbCtx()); mergeWork.resolveConcatenateMerge(context.getDb().getConf()); mergeWork.setMapperCannotSpanPartns(true); mergeWork.setSourceTableInputFormat(desc.getInputFormatClass().getName()); - - Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); - List<String> inputDirStr = Lists.newArrayList(inputDirList.toString()); - pathToAliases.put(desc.getInputDir(), inputDirStr); - mergeWork.setPathToAliases(pathToAliases); - - FileMergeDesc fmd = getFileMergeDesc(); - Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd); - Map<String, Operator<? extends OperatorDesc>> aliasToWork = - new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); - aliasToWork.put(inputDirList.toString(), mergeOp); - mergeWork.setAliasToWork(aliasToWork); - - return mergeWork; - } - - private FileMergeDesc getFileMergeDesc() { - // safe to assume else is ORC as semantic analyzer will check for RC/ORC - FileMergeDesc fmd = (desc.getInputFormatClass().equals(RCFileInputFormat.class)) ? - new RCFileMergeDesc() : - new OrcFileMergeDesc(); - - ListBucketingCtx lbCtx = desc.getLbCtx(); - boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir(); - int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); + final FileMergeDesc fmd; + if (desc.getInputFormatClass().equals(RCFileInputFormat.class)) { + fmd = new RCFileMergeDesc(); + } else { + // safe to assume else is ORC as semantic analyzer will check for RC/ORC + fmd = new OrcFileMergeDesc(); + } fmd.setDpCtx(null); fmd.setHasDynamicPartitions(false); @@ -104,30 +83,32 @@ public class AlterTableConcatenateOperation extends DDLOperation<AlterTableConca fmd.setListBucketingDepth(lbd); fmd.setOutputPath(desc.getOutputDir()); - return fmd; - } + CompilationOpContext opContext = context.getDriverContext().getCtx().getOpContext(); + Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd); - private Task<?> getTask(MergeFileWork mergeWork) { + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = + new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); + aliasToWork.put(desc.getInputDir().toString(), mergeOp); + mergeWork.setAliasToWork(aliasToWork); + DriverContext driverCxt = new DriverContext(); + Task<?> task; if (context.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { TezWork tezWork = new TezWork(context.getQueryState().getQueryId(), context.getConf()); mergeWork.setName("File Merge"); tezWork.add(mergeWork); - Task<?> task = new TezTask(); + task = new TezTask(); ((TezTask) task).setWork(tezWork); - return task; } else { - Task<?> task = new MergeFileTask(); + task = new MergeFileTask(); ((MergeFileTask) task).setWork(mergeWork); - return task; } - } - private int executeTask(CompilationOpContext opContext, Task<?> task) { - DriverContext driverCxt = new DriverContext(); + // initialize the task and execute task.initialize(context.getQueryState(), context.getQueryPlan(), driverCxt, opContext); + Task<? extends Serializable> subtask = task; int ret = task.execute(driverCxt); - if (task.getException() != null) { - context.getTask().setException(task.getException()); + if (subtask.getException() != null) { + context.getTask().setException(subtask.getException()); } return ret; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index c16aad8..1cbc272 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -343,7 +343,7 @@ public class MapOperator extends AbstractMapOperator { private Map<String, Configuration> cloneConfsForNestedColPruning(Configuration hconf) { Map<String, Configuration> tableNameToConf = new HashMap<>(); - for (Map.Entry<Path, List<String>> e : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, ArrayList<String>> e : conf.getPathToAliases().entrySet()) { List<String> aliases = e.getValue(); if (aliases == null || aliases.isEmpty()) { continue; @@ -426,7 +426,7 @@ public class MapOperator extends AbstractMapOperator { Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf); Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(tableNameToConf); - for (Map.Entry<Path, List<String>> entry : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) { Path onefile = entry.getKey(); List<String> aliases = entry.getValue(); PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4372663..6b8e286 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2384,7 +2384,7 @@ public final class Utilities { final Configuration myConf = conf; final JobConf myJobConf = jobConf; final Map<String, Operator<?>> aliasToWork = work.getAliasToWork(); - final Map<Path, List<String>> pathToAlias = work.getPathToAliases(); + final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases(); final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); Runnable r = new Runnable() { @Override @@ -3219,7 +3219,8 @@ public final class Utilities { LOG.info("Processing alias {}", alias); // The alias may not have any path - Collection<Map.Entry<Path, List<String>>> pathToAliases = work.getPathToAliases().entrySet(); + Collection<Map.Entry<Path, ArrayList<String>>> pathToAliases = + work.getPathToAliases().entrySet(); if (!skipDummy) { // ConcurrentModification otherwise if adding dummy. pathToAliases = new ArrayList<>(pathToAliases); @@ -3227,7 +3228,7 @@ public final class Utilities { boolean isEmptyTable = true; boolean hasLogged = false; - for (Map.Entry<Path, List<String>> e : pathToAliases) { + for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) { if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } @@ -3446,8 +3447,8 @@ public final class Utilities { // update the work - Map<Path, List<String>> pathToAliases = work.getPathToAliases(); - List<String> newList = new ArrayList<String>(1); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = work.getPathToAliases(); + ArrayList<String> newList = new ArrayList<String>(1); newList.add(alias); pathToAliases.put(newPath, newList); @@ -3509,7 +3510,7 @@ public final class Utilities { public static void createTmpDirs(Configuration conf, MapWork mWork) throws IOException { - Map<Path, List<String>> pa = mWork.getPathToAliases(); + Map<Path, ArrayList<String>> pa = mWork.getPathToAliases(); if (MapUtils.isNotEmpty(pa)) { // common case: 1 table scan per map-work // rare case: smb joins diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 3278dfe..f06ac37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -261,7 +261,7 @@ public class DagUtils { Set<URI> fileSinkUris = new HashSet<URI>(); List<Node> topNodes = new ArrayList<Node>(); - Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); for (Operator<? extends OperatorDesc> operator : aliasToWork.values()) { topNodes.add(operator); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 308de1a..5a903d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -28,9 +28,11 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -44,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -558,7 +561,7 @@ public class VectorMapOperator extends AbstractMapOperator { HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap = new HashMap<PartitionDesc, VectorPartitionContext>(); - for (Map.Entry<Path, List<String>> entry : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) { Path path = entry.getKey(); PartitionDesc partDesc = conf.getPathToPartitionInfo().get(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 1844ce0..5f2539f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -334,7 +334,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException { init(job); - Map<Path, List<String>> pathToAliases = mrwork.getPathToAliases(); + Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); Map<String, Operator<? extends OperatorDesc>> aliasToWork = mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() @@ -608,11 +608,11 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ * @return the sampled splits */ private List<CombineFileSplit> sampleSplits(List<CombineFileSplit> splits) { - Map<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample(); + HashMap<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample(); List<CombineFileSplit> retLists = new ArrayList<CombineFileSplit>(); Map<String, ArrayList<CombineFileSplit>> aliasToSplitList = new HashMap<String, ArrayList<CombineFileSplit>>(); - Map<Path, List<String>> pathToAliases = mrwork.getPathToAliases(); - Map<Path, List<String>> pathToAliasesNoScheme = removeScheme(pathToAliases); + Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); + Map<Path, ArrayList<String>> pathToAliasesNoScheme = removeScheme(pathToAliases); // Populate list of exclusive splits for every sampled alias // @@ -681,9 +681,9 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ return retLists; } - Map<Path, List<String>> removeScheme(Map<Path, List<String>> pathToAliases) { - Map<Path, List<String>> result = new HashMap<>(); - for (Map.Entry <Path, List<String>> entry : pathToAliases.entrySet()) { + Map<Path, ArrayList<String>> removeScheme(Map<Path, ArrayList<String>> pathToAliases) { + Map<Path, ArrayList<String>> result = new HashMap<>(); + for (Map.Entry <Path, ArrayList<String>> entry : pathToAliases.entrySet()) { Path newKey = Path.getPathWithoutSchemeAndAuthority(entry.getKey()); StringInternUtils.internUriStringsInPath(newKey); result.put(newKey, entry.getValue()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java index 0d2eb0a..07824c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -52,7 +53,7 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri extends HiveContextAwareRecordReader<K, V> { private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class); - private Map<Path, PartitionDesc> pathToPartInfo; + private LinkedHashMap<Path, PartitionDesc> pathToPartInfo; public CombineHiveRecordReader(InputSplit split, Configuration conf, Reporter reporter, Integer partition, RecordReader preReader) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 8980a62..f75ed5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -23,11 +23,13 @@ import java.nio.file.FileSystemNotFoundException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -64,6 +66,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -417,7 +420,8 @@ public final class HiveFileFormatUtils { return pathToPartitionInfo.get(path); } - private static boolean foundAlias(Map<Path, List<String>> pathToAliases, Path path) { + private static boolean foundAlias(Map<Path, ArrayList<String>> pathToAliases, + Path path) { List<String> aliases = pathToAliases.get(path); if ((aliases == null) || (aliases.isEmpty())) { return false; @@ -425,7 +429,8 @@ public final class HiveFileFormatUtils { return true; } - private static Path getMatchingPath(Map<Path, List<String>> pathToAliases, Path dir) { + private static Path getMatchingPath(Map<Path, ArrayList<String>> pathToAliases, + Path dir) { // First find the path to be searched Path path = dir; if (foundAlias(pathToAliases, path)) { @@ -457,9 +462,11 @@ public final class HiveFileFormatUtils { * @param aliasToWork The operator tree to be invoked for a given alias * @param dir The path to look for **/ - public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath(Map<Path, List<String>> pathToAliases, - Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) { - List<Operator<? extends OperatorDesc>> opList = new ArrayList<Operator<? extends OperatorDesc>>(); + public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath( + Map<Path, ArrayList<String>> pathToAliases, + Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) { + List<Operator<? extends OperatorDesc>> opList = + new ArrayList<Operator<? extends OperatorDesc>>(); List<String> aliases = doGetAliasesFromPath(pathToAliases, dir); for (String alias : aliases) { @@ -473,7 +480,9 @@ public final class HiveFileFormatUtils { * @param pathToAliases mapping from path to aliases * @param dir The path to look for **/ - public static List<String> doGetAliasesFromPath(Map<Path, List<String>> pathToAliases, Path dir) { + public static List<String> doGetAliasesFromPath( + Map<Path, ArrayList<String>> pathToAliases, + Path dir) { if (pathToAliases == null) { return new ArrayList<String>(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index cff7e04..4bd4a24 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -878,12 +878,13 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } ArrayList<String> aliases = new ArrayList<String>(); - Iterator<Entry<Path, List<String>>> iterator = this.mrwork.getPathToAliases().entrySet().iterator(); + Iterator<Entry<Path, ArrayList<String>>> iterator = this.mrwork + .getPathToAliases().entrySet().iterator(); Set<Path> splitParentPaths = null; int pathsSize = this.mrwork.getPathToAliases().entrySet().size(); while (iterator.hasNext()) { - Entry<Path, List<String>> entry = iterator.next(); + Entry<Path, ArrayList<String>> entry = iterator.next(); Path key = entry.getKey(); boolean match; if (nonNative) { @@ -913,7 +914,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } if (match) { - List<String> list = entry.getValue(); + ArrayList<String> list = entry.getValue(); for (String val : list) { aliases.add(val); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java index 30957ca..26f7733 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java @@ -43,7 +43,7 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { Map<Path, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo(); List<Path> toRemovePaths = new ArrayList<>(); Map<Path, PartitionDesc> toAddPathToPart = new HashMap<>(); - Map<Path, List<String>> pathToAliases = work.getMapWork().getPathToAliases(); + Map<Path, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases(); for (Map.Entry<Path, PartitionDesc> pathPartEntry : pathToParts.entrySet()) { Path path = pathPartEntry.getKey(); @@ -62,7 +62,7 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { symlinks = fileSystem.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); } toRemovePaths.add(path); - List<String> aliases = pathToAliases.remove(path); + ArrayList<String> aliases = pathToAliases.remove(path); for (FileStatus symlink : symlinks) { BufferedReader reader = null; try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java index 594289e..3044603 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.InputFormat; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; @Explain(displayName = "Merge File Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -131,7 +132,7 @@ public class MergeFileWork extends MapWork { public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, TableDesc tblDesc, - List<String> aliases, + ArrayList<String> aliases, PartitionDesc partDesc) { super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, aliases, partDesc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 6d525ba..0444562 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -18,9 +18,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.common.StringInternUtils; @@ -85,7 +87,7 @@ public class ProjectionPusher { final Set<String> aliases = new HashSet<String>(); try { - List<String> a = HiveFileFormatUtils.getFromPathRecursively( + ArrayList<String> a = HiveFileFormatUtils.getFromPathRecursively( mapWork.getPathToAliases(), new Path(splitPath), null, false, true); if (a != null) { aliases.addAll(a); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 5d6143d..3277765 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -117,6 +117,7 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; @@ -861,7 +862,8 @@ public final class GenMapRedUtils { } else if (task instanceof ExecDriver) { MapredWork work = (MapredWork) task.getWork(); work.getMapWork().deriveExplainAttributes(); - Map<String, Operator<? extends OperatorDesc>> opMap = work.getMapWork().getAliasToWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = work + .getMapWork().getAliasToWork(); if (opMap != null && !opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setKeyAndValueDesc(work.getReduceWork(), op); @@ -983,7 +985,7 @@ public final class GenMapRedUtils { conf.getBoolVar( HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); work.setMapperCannotSpanPartns(mapperCannotSpanPartns); - work.setPathToAliases(new LinkedHashMap<Path, List<String>>()); + work.setPathToAliases(new LinkedHashMap<Path, ArrayList<String>>()); work.setPathToPartitionInfo(new LinkedHashMap<Path, PartitionDesc>()); work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>()); return mrWork; @@ -1162,13 +1164,13 @@ public final class GenMapRedUtils { */ public static void replaceMapWork(String sourceAlias, String targetAlias, MapWork source, MapWork target) { - Map<Path, List<String>> sourcePathToAliases = source.getPathToAliases(); + Map<Path, ArrayList<String>> sourcePathToAliases = source.getPathToAliases(); Map<Path, PartitionDesc> sourcePathToPartitionInfo = source.getPathToPartitionInfo(); Map<String, Operator<? extends OperatorDesc>> sourceAliasToWork = source.getAliasToWork(); Map<String, PartitionDesc> sourceAliasToPartnInfo = source.getAliasToPartnInfo(); - Map<Path, List<String>> targetPathToAliases = target.getPathToAliases(); - Map<Path, PartitionDesc> targetPathToPartitionInfo = target.getPathToPartitionInfo(); + LinkedHashMap<Path, ArrayList<String>> targetPathToAliases = target.getPathToAliases(); + LinkedHashMap<Path, PartitionDesc> targetPathToPartitionInfo = target.getPathToPartitionInfo(); Map<String, Operator<? extends OperatorDesc>> targetAliasToWork = target.getAliasToWork(); Map<String, PartitionDesc> targetAliasToPartnInfo = target.getAliasToPartnInfo(); @@ -1190,8 +1192,8 @@ public final class GenMapRedUtils { targetAliasToWork.remove(targetAlias); targetAliasToPartnInfo.remove(targetAlias); List<Path> pathsToRemove = new ArrayList<>(); - for (Entry<Path, List<String>> entry: targetPathToAliases.entrySet()) { - List<String> aliases = entry.getValue(); + for (Entry<Path, ArrayList<String>> entry: targetPathToAliases.entrySet()) { + ArrayList<String> aliases = entry.getValue(); aliases.remove(targetAlias); if (aliases.isEmpty()) { pathsToRemove.add(entry.getKey()); @@ -1207,8 +1209,8 @@ public final class GenMapRedUtils { targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo); targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo); List<Path> pathsToAdd = new ArrayList<>(); - for (Entry<Path, List<String>> entry: sourcePathToAliases.entrySet()) { - List<String> aliases = entry.getValue(); + for (Entry<Path, ArrayList<String>> entry: sourcePathToAliases.entrySet()) { + ArrayList<String> aliases = entry.getValue(); if (aliases.contains(sourceAlias)) { pathsToAdd.add(entry.getKey()); } @@ -1651,7 +1653,7 @@ public final class GenMapRedUtils { // create the merge file work MergeFileWork work = new MergeFileWork(inputDirs, finalName, hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName(), tblDesc); - Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(inputDir, inputDirstr); work.setMapperCannotSpanPartns(true); work.setPathToAliases(pathToAliases); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 5ed43c7..1256e1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -147,14 +147,14 @@ public class MapJoinProcessor extends Transform { smallTableAliasList.add(alias); // get input path and remove this alias from pathToAlias // because this file will be fetched by fetch operator - Map<Path, List<String>> pathToAliases = newWork.getMapWork().getPathToAliases(); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases(); // keep record all the input path for this alias HashSet<Path> pathSet = new HashSet<>(); HashSet<Path> emptyPath = new HashSet<>(); - for (Map.Entry<Path, List<String>> entry2 : pathToAliases.entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry2 : pathToAliases.entrySet()) { Path path = entry2.getKey(); - List<String> list = entry2.getValue(); + ArrayList<String> list = entry2.getValue(); if (list.contains(alias)) { // add to path set pathSet.add(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 4ac2567..0b5de81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Stack; @@ -117,8 +119,8 @@ public abstract class AbstractJoinTaskDispatcher implements Dispatcher { } public long getTotalKnownInputSize(Context context, MapWork currWork, - Map<Path, List<String>> pathToAliases, - Map<String, Long> aliasToSize) throws SemanticException { + Map<Path, ArrayList<String>> pathToAliases, + HashMap<String, Long> aliasToSize) throws SemanticException { try { // go over all the input paths, and calculate a known total size, known // size for each input alias. @@ -128,7 +130,7 @@ public abstract class AbstractJoinTaskDispatcher implements Dispatcher { // is chosen as big table, what's the total size of left tables, which // are going to be small tables. long aliasTotalKnownInputSize = 0L; - for (Map.Entry<Path, List<String>> entry : pathToAliases.entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : pathToAliases.entrySet()) { Path path = entry.getKey(); List<String> aliasList = entry.getValue(); ContentSummary cs = context.getCS(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 0d9d5e0..e564daf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -241,7 +241,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme String childMRAlias = childMRAliases.get(0); // Sanity check to make sure there is no alias conflict after merge. - for (Entry<Path, List<String>> entry : childMapWork.getPathToAliases().entrySet()) { + for (Entry<Path, ArrayList<String>> entry : childMapWork.getPathToAliases().entrySet()) { Path path = entry.getKey(); List<String> aliases = entry.getValue(); @@ -392,7 +392,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme // Must be deterministic order map for consistent q-test output across Java versions HashMap<Task<? extends Serializable>, Set<String>> taskToAliases = new LinkedHashMap<Task<? extends Serializable>, Set<String>>(); - Map<Path, List<String>> pathToAliases = currWork.getPathToAliases(); + HashMap<Path, ArrayList<String>> pathToAliases = currWork.getPathToAliases(); Map<String, Operator<? extends OperatorDesc>> aliasToWork = currWork.getAliasToWork(); // start to generate multiple map join tasks diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java index b7dd90d..ec9813d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java @@ -148,14 +148,14 @@ public class NullScanTaskDispatcher implements Dispatcher { tso.getConf().setIsMetadataOnly(true); } // group path alias according to work - Map<Path, List<String>> candidates = new HashMap<>(); + Map<Path, ArrayList<String>> candidates = new HashMap<>(); for (Path path : work.getPaths()) { - List<String> aliasesAffected = work.getPathToAliases().get(path); + ArrayList<String> aliasesAffected = work.getPathToAliases().get(path); if (CollectionUtils.isNotEmpty(aliasesAffected)) { candidates.put(path, aliasesAffected); } } - for (Entry<Path, List<String>> entry : candidates.entrySet()) { + for (Entry<Path, ArrayList<String>> entry : candidates.entrySet()) { processAlias(work, entry.getKey(), entry.getValue(), aliases); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index ebf1708..af3edf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -76,7 +77,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl Map<String, PartitionDesc> aliasToPartitionInfo = currWork.getAliasToPartnInfo(); List<Path> removePaths = new ArrayList<>(); - for (Map.Entry<Path, List<String>> entry : currWork.getPathToAliases().entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : currWork.getPathToAliases().entrySet()) { boolean keepPath = false; for (String alias : entry.getValue()) { if (aliasToPartitionInfo.containsKey(alias)) { @@ -259,7 +260,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl HashMap<Task<? extends Serializable>, Set<String>> taskToAliases = new LinkedHashMap<Task<? extends Serializable>, Set<String>>(); // Note that pathToAlias will behave as if the original plan was a join plan - Map<Path, List<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); + HashMap<Path, ArrayList<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); // generate a map join task for the big table SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index cbb3df0..1cf44b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1207,7 +1207,7 @@ public class Vectorizer implements PhysicalPlanResolver { // Eliminate MR plans with more than one TableScanOperator. - Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { setNodeIssue("Vectorized map work requires work"); return null; @@ -1714,8 +1714,8 @@ public class Vectorizer implements PhysicalPlanResolver { List<String> tableDataColumnList = null; List<TypeInfo> tableDataTypeInfoList = null; - Map<Path, List<String>> pathToAliases = mapWork.getPathToAliases(); - Map<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases(); + LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); // Remember the input file formats we validated and why. Set<String> inputFileFormatClassNameSet = new HashSet<String>(); @@ -1726,7 +1726,7 @@ public class Vectorizer implements PhysicalPlanResolver { Set<Support> inputFormatSupportSet = new TreeSet<Support>(); boolean outsideLoopIsFirstPartition = true; - for (Entry<Path, List<String>> entry: pathToAliases.entrySet()) { + for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) { final boolean isFirstPartition = outsideLoopIsFirstPartition; outsideLoopIsFirstPartition = false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ff7f9a8..698d7fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1995,7 +1995,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { private void analyzeAlterTablePartMergeFiles(ASTNode ast, String tableName, HashMap<String, String> partSpec) throws SemanticException { + AlterTableConcatenateDesc mergeDesc = new AlterTableConcatenateDesc( + tableName, partSpec); + List<Path> inputDir = new ArrayList<Path>(); Path oldTblPartLoc = null; Path newTblPartLoc = null; Table tblObj = null; @@ -2016,6 +2019,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); return; } + mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj)); List<String> bucketCols = null; Class<? extends InputFormat> inputFormatClass = null; @@ -2060,9 +2064,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } // throw a HiveException for other than rcfile and orcfile. - if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass.equals(OrcInputFormat.class))) { + if (!((inputFormatClass.equals(RCFileInputFormat.class) || + (inputFormatClass.equals(OrcInputFormat.class))))) { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_FILE_FORMAT.getMsg()); } + mergeDesc.setInputFormatClass(inputFormatClass); // throw a HiveException if the table/partition is bucketized if (bucketCols != null && bucketCols.size() > 0) { @@ -2084,14 +2090,19 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg()); } + inputDir.add(oldTblPartLoc); + + mergeDesc.setInputDir(inputDir); + + mergeDesc.setLbCtx(lbCtx); + addInputsOutputsAlterTable(tableName, partSpec, null, AlterTableType.MERGEFILES, false); - TableDesc tblDesc = Utilities.getTableDesc(tblObj); - Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); - AlterTableConcatenateDesc mergeDesc = new AlterTableConcatenateDesc(tableName, partSpec, lbCtx, oldTblPartLoc, - queryTmpdir, inputFormatClass, Utilities.getTableDesc(tblObj)); DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc); ddlWork.setNeedLock(true); Task<?> mergeTask = TaskFactory.get(ddlWork); + TableDesc tblDesc = Utilities.getTableDesc(tblObj); + Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); + mergeDesc.setOutputDir(queryTmpdir); // No need to handle MM tables - unsupported path. LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 42637df..41a3b00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -96,7 +97,7 @@ public class MapReduceCompiler extends TaskCompiler { protected void setInputFormat(Task<? extends Serializable> task) { if (task instanceof ExecDriver) { MapWork work = ((MapredWork) task.getWork()).getMapWork(); - Map<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(work, op); @@ -140,8 +141,8 @@ public class MapReduceCompiler extends TaskCompiler { private void breakTaskTree(Task<? extends Serializable> task) { if (task instanceof ExecDriver) { - Map<String, Operator<? extends OperatorDesc>> opMap = - ((MapredWork) task.getWork()).getMapWork().getAliasToWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task + .getWork()).getMapWork().getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { breakOperatorTree(op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 5000ba4..8c3ee0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -697,7 +697,7 @@ public class TezCompiler extends TaskCompiler { for (BaseWork w: all) { if (w instanceof MapWork) { MapWork mapWork = (MapWork) w; - Map<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(mapWork, op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 6bc5925..5bf5502 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -530,7 +530,7 @@ public class SparkCompiler extends TaskCompiler { for (BaseWork w: all) { if (w instanceof MapWork) { MapWork mapWork = (MapWork) w; - Map<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(mapWork, op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index cc5baee..7c1dc45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -21,9 +21,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +53,8 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria private static final long serialVersionUID = 1L; private HashMap<Task<? extends Serializable>, Set<String>> taskToAliases; - Map<Path, List<String>> pathToAliases; - Map<String, Long> aliasToKnownSize; + HashMap<Path, ArrayList<String>> pathToAliases; + HashMap<String, Long> aliasToKnownSize; private Task<? extends Serializable> commonJoinTask; private Path localTmpDir; @@ -77,7 +79,7 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria this.commonJoinTask = commonJoinTask; } - public Map<String, Long> getAliasToKnownSize() { + public HashMap<String, Long> getAliasToKnownSize() { return aliasToKnownSize == null ? aliasToKnownSize = new HashMap<String, Long>() : aliasToKnownSize; } @@ -86,11 +88,11 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria this.aliasToKnownSize = aliasToKnownSize; } - public Map<Path, List<String>> getPathToAliases() { + public HashMap<Path, ArrayList<String>> getPathToAliases() { return pathToAliases; } - public void setPathToAliases(Map<Path, List<String>> pathToAliases) { + public void setPathToAliases(final HashMap<Path, ArrayList<String>> pathToAliases) { this.pathToAliases = pathToAliases; } @@ -212,10 +214,10 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria Set<String> aliases = getParticipants(ctx); Map<String, Long> aliasToKnownSize = ctx.getAliasToKnownSize(); - Map<Path, List<String>> pathToAliases = ctx.getPathToAliases(); + Map<Path, ArrayList<String>> pathToAliases = ctx.getPathToAliases(); Set<Path> unknownPaths = new HashSet<>(); - for (Map.Entry<Path, List<String>> entry : pathToAliases.entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : pathToAliases.entrySet()) { for (String alias : entry.getValue()) { if (aliases.contains(alias) && !aliasToKnownSize.containsKey(alias)) { unknownPaths.add(entry.getKey()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index a828809..54c9659 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -248,10 +248,10 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, work.removePathToPartitionInfo(path); // the root path is not useful anymore // cleanup pathToAliases - Map<Path, List<String>> pta = work.getPathToAliases(); + LinkedHashMap<Path, ArrayList<String>> pta = work.getPathToAliases(); assert pta.size() == 1; path = pta.keySet().iterator().next(); - List<String> aliases = pta.get(path); + ArrayList<String> aliases = pta.get(path); work.removePathToAlias(path); // the root path is not useful anymore // populate pathToPartitionInfo and pathToAliases w/ DP paths diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1d06435..bb063c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -94,16 +94,15 @@ public class MapWork extends BaseWork { // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing - private Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); + private LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); - private Map<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); + private LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); - private Map<String, Operator<? extends OperatorDesc>> aliasToWork = - new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); + private LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); - private Map<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>(); + private LinkedHashMap<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>(); - private Map<String, SplitSample> nameToSplitSample = new LinkedHashMap<String, SplitSample>(); + private HashMap<String, SplitSample> nameToSplitSample = new LinkedHashMap<String, SplitSample>(); // If this map task has a FileSinkOperator, and bucketing/sorting metadata can be // inferred about the data being written by that operator, these are mappings from the directory @@ -182,25 +181,25 @@ public class MapWork extends BaseWork { } @Explain(displayName = "Path -> Alias", explainLevels = { Level.EXTENDED }) - public Map<Path, List<String>> getPathToAliases() { + public LinkedHashMap<Path, ArrayList<String>> getPathToAliases() { // return pathToAliases; } - public void setPathToAliases(Map<Path, List<String>> pathToAliases) { + public void setPathToAliases(final LinkedHashMap<Path, ArrayList<String>> pathToAliases) { for (Path p : pathToAliases.keySet()) { StringInternUtils.internUriStringsInPath(p); } this.pathToAliases = pathToAliases; } - public void addPathToAlias(Path path, List<String> aliases){ + public void addPathToAlias(Path path, ArrayList<String> aliases){ StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } public void addPathToAlias(Path path, String newAlias){ - List<String> aliases = pathToAliases.get(path); + ArrayList<String> aliases = pathToAliases.get(path); if (aliases == null) { aliases = new ArrayList<>(1); StringInternUtils.internUriStringsInPath(path); @@ -226,25 +225,26 @@ public class MapWork extends BaseWork { * @return */ @Explain(displayName = "Truncated Path -> Alias", explainLevels = { Level.EXTENDED }) - public Map<String, List<String>> getTruncatedPathToAliases() { - Map<String, List<String>> trunPathToAliases = new LinkedHashMap<String, List<String>>(); - Iterator<Entry<Path, List<String>>> itr = this.pathToAliases.entrySet().iterator(); + public Map<String, ArrayList<String>> getTruncatedPathToAliases() { + Map<String, ArrayList<String>> trunPathToAliases = new LinkedHashMap<String, + ArrayList<String>>(); + Iterator<Entry<Path, ArrayList<String>>> itr = this.pathToAliases.entrySet().iterator(); while (itr.hasNext()) { - Entry<Path, List<String>> entry = itr.next(); + final Entry<Path, ArrayList<String>> entry = itr.next(); Path origiKey = entry.getKey(); String newKey = PlanUtils.removePrefixFromWarehouseConfig(origiKey.toString()); - List<String> value = entry.getValue(); + ArrayList<String> value = entry.getValue(); trunPathToAliases.put(newKey, value); } return trunPathToAliases; } @Explain(displayName = "Path -> Partition", explainLevels = { Level.EXTENDED }) - public Map<Path, PartitionDesc> getPathToPartitionInfo() { + public LinkedHashMap<Path, PartitionDesc> getPathToPartitionInfo() { return pathToPartitionInfo; } - public void setPathToPartitionInfo(final Map<Path, PartitionDesc> pathToPartitionInfo) { + public void setPathToPartitionInfo(final LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo) { for (Path p : pathToPartitionInfo.keySet()) { StringInternUtils.internUriStringsInPath(p); } @@ -364,7 +364,7 @@ public class MapWork extends BaseWork { /** * @return the aliasToPartnInfo */ - public Map<String, PartitionDesc> getAliasToPartnInfo() { + public LinkedHashMap<String, PartitionDesc> getAliasToPartnInfo() { return aliasToPartnInfo; } @@ -377,16 +377,17 @@ public class MapWork extends BaseWork { this.aliasToPartnInfo = aliasToPartnInfo; } - public Map<String, Operator<? extends OperatorDesc>> getAliasToWork() { + public LinkedHashMap<String, Operator<? extends OperatorDesc>> getAliasToWork() { return aliasToWork; } - public void setAliasToWork(Map<String, Operator<? extends OperatorDesc>> aliasToWork) { + public void setAliasToWork( + final LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork) { this.aliasToWork = aliasToWork; } @Explain(displayName = "Split Sample", explainLevels = { Level.EXTENDED }) - public Map<String, SplitSample> getNameToSplitSample() { + public HashMap<String, SplitSample> getNameToSplitSample() { return nameToSplitSample; } @@ -416,7 +417,7 @@ public class MapWork extends BaseWork { public void addMapWork(Path path, String alias, Operator<?> work, PartitionDesc pd) { StringInternUtils.internUriStringsInPath(path); - List<String> curAliases = pathToAliases.get(path); + ArrayList<String> curAliases = pathToAliases.get(path); if (curAliases == null) { assert (pathToPartitionInfo.get(path) == null); curAliases = new ArrayList<>(); @@ -449,7 +450,7 @@ public class MapWork extends BaseWork { } public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, - TableDesc tblDesc, List<String> aliases, PartitionDesc partDesc) { + TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) { StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); pathToPartitionInfo.put(path, partDesc); @@ -521,7 +522,7 @@ public class MapWork extends BaseWork { public void mergeAliasedInput(String alias, Path pathDir, PartitionDesc partitionInfo) { StringInternUtils.internUriStringsInPath(pathDir); alias = alias.intern(); - List<String> aliases = pathToAliases.get(pathDir); + ArrayList<String> aliases = pathToAliases.get(pathDir); if (aliases == null) { aliases = new ArrayList<>(Arrays.asList(alias)); pathToAliases.put(pathDir, aliases); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java index 5f55ceb..a946b4f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java @@ -26,12 +26,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -382,8 +382,10 @@ public class TestGetInputSummary { context.addCS(partitionPath.toString(), entry.getValue()); } - Map<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); - Map<Path, List<String>> pathToAliasTable = new LinkedHashMap<>(); + LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = + new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = + new LinkedHashMap<>(); TableScanOperator scanOp = new TableScanOperator(); PartitionDesc partitionDesc = new PartitionDesc( diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index a363b22..c7cd4ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -299,10 +299,10 @@ public class TestOperators extends TestCase { new Path("hdfs:///testDir/testFile")); // initialize pathToAliases - List<String> aliases = new ArrayList<String>(); + ArrayList<String> aliases = new ArrayList<String>(); aliases.add("a"); aliases.add("b"); - Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(new Path("hdfs:///testDir"), aliases); // initialize pathToTableInfo diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java index d8e4347..3aaf561 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java @@ -22,8 +22,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -66,7 +64,7 @@ public class TestPlan extends TestCase { ArrayList<String> aliasList = new ArrayList<String>(); aliasList.add("a"); - Map<Path, List<String>> pa = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pa = new LinkedHashMap<>(); pa.put(new Path("/tmp/testfolder"), aliasList); TableDesc tblDesc = Utilities.defaultTd; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index fbf948c..2d48449 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -366,7 +366,7 @@ public class TestUtilities { assertEquals(mapWork.getPathToPartitionInfo().size(), numPartitions); assertEquals(mapWork.getAliasToWork().size(), numPartitions); - for (Map.Entry<Path, List<String>> entry : mapWork.getPathToAliases().entrySet()) { + for (Map.Entry<Path, ArrayList<String>> entry : mapWork.getPathToAliases().entrySet()) { assertNotNull(entry.getKey()); assertNotNull(entry.getValue()); assertEquals(entry.getValue().size(), 1); @@ -485,7 +485,7 @@ public class TestUtilities { MapWork mapWork = new MapWork(); Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)); - Map<Path, List<String>> pathToAliasTable = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>(); String testTableName = "testTable"; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index befeb4f..b67aec3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -133,14 +133,14 @@ public class TestTezTask { op = mock(Operator.class); - Map<String, Operator<? extends OperatorDesc>> map + LinkedHashMap<String, Operator<? extends OperatorDesc>> map = new LinkedHashMap<String,Operator<? extends OperatorDesc>>(); map.put("foo", op); mws[0].setAliasToWork(map); mws[1].setAliasToWork(map); - Map<Path, List<String>> pathMap = new LinkedHashMap<>(); - List<String> aliasList = new ArrayList<String>(); + LinkedHashMap<Path, ArrayList<String>> pathMap = new LinkedHashMap<>(); + ArrayList<String> aliasList = new ArrayList<String>(); aliasList.add("foo"); pathMap.put(new Path("foo"), aliasList); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b5958fa..9a8ae3b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2201,8 +2201,8 @@ public class TestInputOutputFormat { mapWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); } mapWork.setUseBucketizedHiveInputFormat(false); - Map<Path, List<String>> aliasMap = new LinkedHashMap<>(); - List<String> aliases = new ArrayList<String>(); + LinkedHashMap<Path, ArrayList<String>> aliasMap = new LinkedHashMap<>(); + ArrayList<String> aliases = new ArrayList<String>(); aliases.add(tableName); LinkedHashMap<Path, PartitionDesc> partMap = new LinkedHashMap<>(); for(int p=0; p < partitions; ++p) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java index 3fc82ad..3a8b5e7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java @@ -32,8 +32,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import java.util.Set; public class TestConditionalResolverCommonJoin { @@ -42,7 +40,7 @@ public class TestConditionalResolverCommonJoin { public void testResolvingDriverAlias() throws Exception { ConditionalResolverCommonJoin resolver = new ConditionalResolverCommonJoin(); - Map<Path, List<String>> pathToAliases = new HashMap<>(); + HashMap<Path, ArrayList<String>> pathToAliases = new HashMap<>(); pathToAliases.put(new Path("path1"), new ArrayList<String>(Arrays.asList("alias1", "alias2"))); pathToAliases.put(new Path("path2"), new ArrayList<String>(Arrays.asList("alias3"))); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java index 3e0d834..1756711 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java @@ -19,9 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import static org.junit.Assert.assertEquals; +import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.fs.Path; import org.junit.Test; @@ -32,11 +31,11 @@ public class TestMapWork { @Test public void testGetAndSetConsistency() { MapWork mw = new MapWork(); - Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(new Path("p0"), Lists.newArrayList("a1", "a2")); mw.setPathToAliases(pathToAliases); - Map<Path, List<String>> pta = mw.getPathToAliases(); + LinkedHashMap<Path, ArrayList<String>> pta = mw.getPathToAliases(); assertEquals(pathToAliases, pta); } diff --git a/ql/src/test/results/clientpositive/llap/orc_merge10.q.out b/ql/src/test/results/clientpositive/llap/orc_merge10.q.out index 7b69b39..3af8190 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge10.q.out @@ -632,7 +632,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: ds 1 part 0 table name: default.orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge6.q.out b/ql/src/test/results/clientpositive/llap/orc_merge6.q.out index 7021220..95cbe1e 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge6.q.out @@ -522,7 +522,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge7.q.out b/ql/src/test/results/clientpositive/llap/orc_merge7.q.out index 1f7b7b7..4155dc6 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge7.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge7.q.out @@ -653,7 +653,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: st 80.0 table name: default.orc_merge5a_n0 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out index 207541b..48217cd 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out @@ -338,7 +338,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: st 80.0 table name: default.orc_merge5a diff --git a/ql/src/test/results/clientpositive/orc_merge10.q.out b/ql/src/test/results/clientpositive/orc_merge10.q.out index 1f70773..e8ebd4e 100644 --- a/ql/src/test/results/clientpositive/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/orc_merge10.q.out @@ -603,7 +603,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: ds 1 part 0 table name: default.orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/orc_merge6.q.out b/ql/src/test/results/clientpositive/orc_merge6.q.out index bc05d2f..a07cfd3 100644 --- a/ql/src/test/results/clientpositive/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/orc_merge6.q.out @@ -486,7 +486,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out index 177bc25..71ae287 100644 --- a/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out @@ -333,7 +333,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: st 80.0 table name: default.orc_merge5a diff --git a/ql/src/test/results/clientpositive/spark/orc_merge6.q.out b/ql/src/test/results/clientpositive/spark/orc_merge6.q.out index b3d1ca4..982b614 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge6.q.out @@ -422,7 +422,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/spark/orc_merge7.q.out b/ql/src/test/results/clientpositive/spark/orc_merge7.q.out index 0c2b8a0..a641ed7 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge7.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge7.q.out @@ -553,7 +553,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: st 80.0 table name: default.orc_merge5a_n0 diff --git a/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out index 8da08bf..1bfbf4d 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out @@ -294,7 +294,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition spec: + partition desc: st 80.0 table name: default.orc_merge5a diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java index d75d709..c2ba3b0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java @@ -108,11 +108,15 @@ public class Msck { boolean success = false; long txnId = -1; int ret = 0; - long partitionExpirySeconds = msckInfo.getPartitionExpirySeconds(); try { Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName()); qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); - HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), partitionExpirySeconds); + if (getConf().getBoolean(MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false)) { + msckInfo.setPartitionExpirySeconds(PartitionManagementTask.getRetentionPeriodInSeconds(table)); + LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", + qualifiedTableName, msckInfo.getPartitionExpirySeconds()); + } + HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), msckInfo.getPartitionExpirySeconds()); // checkMetastore call will fill in result with partitions that are present in filesystem // and missing in metastore - accessed through getPartitionsNotInMs // And partitions that are not present in filesystem and metadata exists in metastore - @@ -249,7 +253,7 @@ public class Msck { firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), "Partitions missing from filesystem:", resultOut, firstWritten); firstWritten |= writeMsckResult(result.getExpiredPartitions(), - "Expired partitions (retention period: " + partitionExpirySeconds + "s) :", resultOut, firstWritten); + "Expired partitions (retention period: " + msckInfo.getPartitionExpirySeconds() + "s) :", resultOut, firstWritten); // sorting to stabilize qfile output (msck_repair_drop.q) Collections.sort(repairOutput); for (String rout : repairOutput) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java index 25d0c64..81bcb56 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java @@ -17,27 +17,29 @@ */ package org.apache.hadoop.hive.metastore; -import java.util.List; -import java.util.Map; +import java.util.ArrayList; +import java.util.LinkedHashMap; /** * Metadata related to Msck. */ public class MsckInfo { - private final String catalogName; - private final String dbName; - private final String tableName; - private final List<Map<String, String>> partSpecs; - private final String resFile; - private final boolean repairPartitions; - private final boolean addPartitions; - private final boolean dropPartitions; - private final long partitionExpirySeconds; - - public MsckInfo(String catalogName, String dbName, String tableName, List<Map<String, String>> partSpecs, - String resFile, boolean repairPartitions, boolean addPartitions, boolean dropPartitions, - long partitionExpirySeconds) { + private String catalogName; + private String dbName; + private String tableName; + private ArrayList<LinkedHashMap<String, String>> partSpecs; + private String resFile; + private boolean repairPartitions; + private boolean addPartitions; + private boolean dropPartitions; + private long partitionExpirySeconds; + + public MsckInfo(final String catalogName, final String dbName, final String tableName, + final ArrayList<LinkedHashMap<String, String>> partSpecs, final String resFile, final boolean repairPartitions, + final boolean addPartitions, + final boolean dropPartitions, + final long partitionExpirySeconds) { this.catalogName = catalogName; this.dbName = dbName; this.tableName = tableName; @@ -53,35 +55,71 @@ public class MsckInfo { return catalogName; } + public void setCatalogName(final String catalogName) { + this.catalogName = catalogName; + } + public String getDbName() { return dbName; } + public void setDbName(final String dbName) { + this.dbName = dbName; + } + public String getTableName() { return tableName; } - public List<Map<String, String>> getPartSpecs() { + public void setTableName(final String tableName) { + this.tableName = tableName; + } + + public ArrayList<LinkedHashMap<String, String>> getPartSpecs() { return partSpecs; } + public void setPartSpecs(final ArrayList<LinkedHashMap<String, String>> partSpecs) { + this.partSpecs = partSpecs; + } + public String getResFile() { return resFile; } + public void setResFile(final String resFile) { + this.resFile = resFile; + } + public boolean isRepairPartitions() { return repairPartitions; } + public void setRepairPartitions(final boolean repairPartitions) { + this.repairPartitions = repairPartitions; + } + public boolean isAddPartitions() { return addPartitions; } + public void setAddPartitions(final boolean addPartitions) { + this.addPartitions = addPartitions; + } + public boolean isDropPartitions() { return dropPartitions; } + public void setDropPartitions(final boolean dropPartitions) { + this.dropPartitions = dropPartitions; + } + public long getPartitionExpirySeconds() { return partitionExpirySeconds; } + + public void setPartitionExpirySeconds(final long partitionExpirySeconds) { + this.partitionExpirySeconds = partitionExpirySeconds; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index da0259c..59001b5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -163,7 +163,7 @@ public class PartitionManagementTask implements MetastoreTaskThread { } } - public static long getRetentionPeriodInSeconds(final Table table) { + static long getRetentionPeriodInSeconds(final Table table) { String retentionPeriod; long retentionSeconds = -1; if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
