This is an automated email from the ASF dual-hosted git repository. jcamacho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 5a061296d7de5b530474146527318f651d142dfb Author: Miklos Gergely <[email protected]> AuthorDate: Thu Jul 11 20:51:32 2019 -0700 HIVE-21959: Clean up Concatenate and Msck DDL commands (Miklos Gergely, reviewed by Zoltan Haindrich) --- .../apache/hadoop/hive/ql/ddl/misc/MsckDesc.java | 13 ++-- .../hadoop/hive/ql/ddl/misc/MsckOperation.java | 24 ++++++- .../table/storage/AlterTableConcatenateDesc.java | 68 +++++++----------- .../storage/AlterTableConcatenateOperation.java | 83 +++++++++++++--------- .../apache/hadoop/hive/ql/exec/MapOperator.java | 4 +- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 13 ++-- .../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 2 +- .../hive/ql/exec/vector/VectorMapOperator.java | 5 +- .../hadoop/hive/ql/io/CombineHiveInputFormat.java | 14 ++-- .../hadoop/hive/ql/io/CombineHiveRecordReader.java | 3 +- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 21 ++---- .../apache/hadoop/hive/ql/io/HiveInputFormat.java | 7 +- .../hadoop/hive/ql/io/SymbolicInputFormat.java | 4 +- .../hadoop/hive/ql/io/merge/MergeFileWork.java | 3 +- .../hive/ql/io/parquet/ProjectionPusher.java | 4 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 22 +++--- .../hadoop/hive/ql/optimizer/MapJoinProcessor.java | 6 +- .../physical/AbstractJoinTaskDispatcher.java | 8 +-- .../physical/CommonJoinTaskDispatcher.java | 4 +- .../optimizer/physical/NullScanTaskDispatcher.java | 6 +- .../physical/SortMergeJoinTaskDispatcher.java | 5 +- .../hive/ql/optimizer/physical/Vectorizer.java | 8 +-- .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 21 ++---- .../hadoop/hive/ql/parse/MapReduceCompiler.java | 7 +- .../apache/hadoop/hive/ql/parse/TezCompiler.java | 2 +- .../hadoop/hive/ql/parse/spark/SparkCompiler.java | 2 +- .../ql/plan/ConditionalResolverCommonJoin.java | 16 ++--- .../ql/plan/ConditionalResolverMergeFiles.java | 4 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 49 +++++++------ .../hadoop/hive/ql/exec/TestGetInputSummary.java | 8 +-- .../apache/hadoop/hive/ql/exec/TestOperators.java | 4 +- .../org/apache/hadoop/hive/ql/exec/TestPlan.java | 4 +- .../apache/hadoop/hive/ql/exec/TestUtilities.java | 4 +- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 6 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 4 +- .../ql/plan/TestConditionalResolverCommonJoin.java | 4 +- .../apache/hadoop/hive/ql/plan/TestMapWork.java | 7 +- .../results/clientpositive/llap/orc_merge10.q.out | 2 +- .../results/clientpositive/llap/orc_merge6.q.out | 2 +- .../results/clientpositive/llap/orc_merge7.q.out | 2 +- .../clientpositive/llap/orc_merge_incompat2.q.out | 2 +- .../test/results/clientpositive/orc_merge10.q.out | 2 +- .../test/results/clientpositive/orc_merge6.q.out | 2 +- .../clientpositive/orc_merge_incompat2.q.out | 2 +- .../results/clientpositive/spark/orc_merge6.q.out | 2 +- .../results/clientpositive/spark/orc_merge7.q.out | 2 +- .../clientpositive/spark/orc_merge_incompat2.q.out | 2 +- .../org/apache/hadoop/hive/metastore/Msck.java | 10 +-- .../org/apache/hadoop/hive/metastore/MsckInfo.java | 70 +++++------------- .../hive/metastore/PartitionManagementTask.java | 2 +- 50 files changed, 256 insertions(+), 315 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java index 32a51fe..4f6f31e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckDesc.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.ddl.misc; import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -36,19 +34,16 @@ public class MsckDesc implements DDLDesc, Serializable { private static final long serialVersionUID = 1L; private final String tableName; - private final ArrayList<LinkedHashMap<String, String>> partitionsSpecs; + private final List<Map<String, String>> partitionsSpecs; private final String resFile; private final boolean repairPartitions; private final boolean addPartitions; private final boolean dropPartitions; - public MsckDesc(String tableName, List<? extends Map<String, String>> partitionSpecs, Path resFile, + public MsckDesc(String tableName, List<Map<String, String>> partitionsSpecs, Path resFile, boolean repairPartitions, boolean addPartitions, boolean dropPartitions) { this.tableName = tableName; - this.partitionsSpecs = new ArrayList<LinkedHashMap<String, String>>(partitionSpecs.size()); - for (Map<String, String> partSpec : partitionSpecs) { - this.partitionsSpecs.add(new LinkedHashMap<>(partSpec)); - } + this.partitionsSpecs = partitionsSpecs; this.resFile = resFile.toString(); this.repairPartitions = repairPartitions; this.addPartitions = addPartitions; @@ -61,7 +56,7 @@ public class MsckDesc implements DDLDesc, Serializable { } @Explain(displayName = "partitions specs", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public ArrayList<LinkedHashMap<String, String>> getPartitionsSpecs() { + public List<Map<String, String>> getPartitionsSpecs() { return partitionsSpecs; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java index dea0a05..ab8cf46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/MsckOperation.java @@ -23,13 +23,19 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import java.io.IOException; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.Msck; import org.apache.hadoop.hive.metastore.MsckInfo; +import org.apache.hadoop.hive.metastore.PartitionManagementTask; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.thrift.TException; /** * Operation process of metastore check. @@ -43,15 +49,29 @@ public class MsckOperation extends DDLOperation<MsckDesc> { } @Override - public int execute() throws HiveException, IOException { + public int execute() throws HiveException, IOException, TException { try { Msck msck = new Msck(false, false); msck.init(context.getDb().getConf()); String[] names = Utilities.getDbTableName(desc.getTableName()); + + long partitionExpirySeconds = -1L; + try (HiveMetaStoreClient msc = new HiveMetaStoreClient(context.getConf())) { + Table table = msc.getTable(SessionState.get().getCurrentCatalog(), names[0], names[1]); + String qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); + boolean msckEnablePartitionRetention = context.getConf().getBoolean( + MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false); + if (msckEnablePartitionRetention) { + partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(table); + LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", qualifiedTableName, + partitionExpirySeconds); + } + } + MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), names[0], names[1], desc.getPartitionsSpecs(), desc.getResFile(), desc.isRepairPartitions(), desc.isAddPartitions(), - desc.isDropPartitions(), -1); + desc.isDropPartitions(), partitionExpirySeconds); return msck.repair(msckInfo); } catch (MetaException e) { LOG.error("Unable to create msck instance.", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java index 64ce2fe..281fcbf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateDesc.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; @@ -34,20 +32,25 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; * DDL task description for ALTER TABLE ... [PARTITION ... ] CONCATENATE commands. */ @Explain(displayName = "Concatenate", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +@SuppressWarnings("rawtypes") public class AlterTableConcatenateDesc implements DDLDesc { - private String tableName; - private Map<String, String> partSpec; - private ListBucketingCtx lbCtx; // context for list bucketing. - - private List<Path> inputDir = new ArrayList<Path>(); - private Path outputDir = null; - private Class<? extends InputFormat> inputFormatClass; - private TableDesc tableDesc; - - public AlterTableConcatenateDesc(String tableName, - Map<String, String> partSpec) { + private final String tableName; + private final Map<String, String> partitionSpec; + private final ListBucketingCtx lbCtx; + private final Path inputDir; + private final Path outputDir; + private final Class<? extends InputFormat> inputFormatClass; + private final TableDesc tableDesc; + + public AlterTableConcatenateDesc(String tableName, Map<String, String> partitionSpec, ListBucketingCtx lbCtx, + Path inputDir, Path outputDir, Class<? extends InputFormat> inputFormatClass, TableDesc tableDesc) { this.tableName = tableName; - this.partSpec = partSpec; + this.partitionSpec = partitionSpec; + this.lbCtx = lbCtx; + this.inputDir = inputDir; + this.outputDir = outputDir; + this.inputFormatClass = inputFormatClass; + this.tableDesc = tableDesc; } @Explain(displayName = "table name") @@ -55,47 +58,28 @@ public class AlterTableConcatenateDesc implements DDLDesc { return tableName; } - @Explain(displayName = "partition desc") - public Map<String, String> getPartSpec() { - return partSpec; + /** For Explain only. */ + @Explain(displayName = "partition spec") + public Map<String, String> getPartitionSpec() { + return partitionSpec; } - public Path getOutputDir() { - return outputDir; - } - - public void setOutputDir(Path outputDir) { - this.outputDir = outputDir; + public ListBucketingCtx getLbCtx() { + return lbCtx; } - public List<Path> getInputDir() { + public Path getInputDir() { return inputDir; } - public void setInputDir(List<Path> inputDir) { - this.inputDir = inputDir; - } - - public ListBucketingCtx getLbCtx() { - return lbCtx; - } - - public void setLbCtx(ListBucketingCtx lbCtx) { - this.lbCtx = lbCtx; + public Path getOutputDir() { + return outputDir; } public Class<? extends InputFormat> getInputFormatClass() { return inputFormatClass; } - public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) { - this.inputFormatClass = inputFormatClass; - } - - public void setTableDesc(TableDesc tableDesc) { - this.tableDesc = tableDesc; - } - public TableDesc getTableDesc() { return tableDesc; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java index 0afc357..718c21d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableConcatenateOperation.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; -import java.io.Serializable; -import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -43,6 +43,8 @@ import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc; import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import com.google.common.collect.Lists; + /** * Operation process of concatenating the files of a table/partition. */ @@ -53,29 +55,48 @@ public class AlterTableConcatenateOperation extends DDLOperation<AlterTableConca @Override public int execute() throws HiveException { - ListBucketingCtx lbCtx = desc.getLbCtx(); - boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir(); - int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); + CompilationOpContext opContext = context.getDriverContext().getCtx().getOpContext(); + + MergeFileWork mergeWork = getMergeFileWork(opContext); + Task<?> task = getTask(mergeWork); + return executeTask(opContext, task); + } + + private MergeFileWork getMergeFileWork(CompilationOpContext opContext) { + List<Path> inputDirList = Lists.newArrayList(desc.getInputDir()); // merge work only needs input and output. - MergeFileWork mergeWork = new MergeFileWork(desc.getInputDir(), desc.getOutputDir(), + MergeFileWork mergeWork = new MergeFileWork(inputDirList, desc.getOutputDir(), desc.getInputFormatClass().getName(), desc.getTableDesc()); - LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); - ArrayList<String> inputDirstr = new ArrayList<String>(1); - inputDirstr.add(desc.getInputDir().toString()); - pathToAliases.put(desc.getInputDir().get(0), inputDirstr); - mergeWork.setPathToAliases(pathToAliases); mergeWork.setListBucketingCtx(desc.getLbCtx()); mergeWork.resolveConcatenateMerge(context.getDb().getConf()); mergeWork.setMapperCannotSpanPartns(true); mergeWork.setSourceTableInputFormat(desc.getInputFormatClass().getName()); - final FileMergeDesc fmd; - if (desc.getInputFormatClass().equals(RCFileInputFormat.class)) { - fmd = new RCFileMergeDesc(); - } else { - // safe to assume else is ORC as semantic analyzer will check for RC/ORC - fmd = new OrcFileMergeDesc(); - } + + Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); + List<String> inputDirStr = Lists.newArrayList(inputDirList.toString()); + pathToAliases.put(desc.getInputDir(), inputDirStr); + mergeWork.setPathToAliases(pathToAliases); + + FileMergeDesc fmd = getFileMergeDesc(); + Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd); + Map<String, Operator<? extends OperatorDesc>> aliasToWork = + new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); + aliasToWork.put(inputDirList.toString(), mergeOp); + mergeWork.setAliasToWork(aliasToWork); + + return mergeWork; + } + + private FileMergeDesc getFileMergeDesc() { + // safe to assume else is ORC as semantic analyzer will check for RC/ORC + FileMergeDesc fmd = (desc.getInputFormatClass().equals(RCFileInputFormat.class)) ? + new RCFileMergeDesc() : + new OrcFileMergeDesc(); + + ListBucketingCtx lbCtx = desc.getLbCtx(); + boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir(); + int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); fmd.setDpCtx(null); fmd.setHasDynamicPartitions(false); @@ -83,32 +104,30 @@ public class AlterTableConcatenateOperation extends DDLOperation<AlterTableConca fmd.setListBucketingDepth(lbd); fmd.setOutputPath(desc.getOutputDir()); - CompilationOpContext opContext = context.getDriverContext().getCtx().getOpContext(); - Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd); + return fmd; + } - LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = - new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); - aliasToWork.put(desc.getInputDir().toString(), mergeOp); - mergeWork.setAliasToWork(aliasToWork); - DriverContext driverCxt = new DriverContext(); - Task<?> task; + private Task<?> getTask(MergeFileWork mergeWork) { if (context.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { TezWork tezWork = new TezWork(context.getQueryState().getQueryId(), context.getConf()); mergeWork.setName("File Merge"); tezWork.add(mergeWork); - task = new TezTask(); + Task<?> task = new TezTask(); ((TezTask) task).setWork(tezWork); + return task; } else { - task = new MergeFileTask(); + Task<?> task = new MergeFileTask(); ((MergeFileTask) task).setWork(mergeWork); + return task; } + } - // initialize the task and execute + private int executeTask(CompilationOpContext opContext, Task<?> task) { + DriverContext driverCxt = new DriverContext(); task.initialize(context.getQueryState(), context.getQueryPlan(), driverCxt, opContext); - Task<? extends Serializable> subtask = task; int ret = task.execute(driverCxt); - if (subtask.getException() != null) { - context.getTask().setException(subtask.getException()); + if (task.getException() != null) { + context.getTask().setException(task.getException()); } return ret; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 1cbc272..c16aad8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -343,7 +343,7 @@ public class MapOperator extends AbstractMapOperator { private Map<String, Configuration> cloneConfsForNestedColPruning(Configuration hconf) { Map<String, Configuration> tableNameToConf = new HashMap<>(); - for (Map.Entry<Path, ArrayList<String>> e : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, List<String>> e : conf.getPathToAliases().entrySet()) { List<String> aliases = e.getValue(); if (aliases == null || aliases.isEmpty()) { continue; @@ -426,7 +426,7 @@ public class MapOperator extends AbstractMapOperator { Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf); Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(tableNameToConf); - for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, List<String>> entry : conf.getPathToAliases().entrySet()) { Path onefile = entry.getKey(); List<String> aliases = entry.getValue(); PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6b8e286..4372663 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2384,7 +2384,7 @@ public final class Utilities { final Configuration myConf = conf; final JobConf myJobConf = jobConf; final Map<String, Operator<?>> aliasToWork = work.getAliasToWork(); - final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases(); + final Map<Path, List<String>> pathToAlias = work.getPathToAliases(); final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); Runnable r = new Runnable() { @Override @@ -3219,8 +3219,7 @@ public final class Utilities { LOG.info("Processing alias {}", alias); // The alias may not have any path - Collection<Map.Entry<Path, ArrayList<String>>> pathToAliases = - work.getPathToAliases().entrySet(); + Collection<Map.Entry<Path, List<String>>> pathToAliases = work.getPathToAliases().entrySet(); if (!skipDummy) { // ConcurrentModification otherwise if adding dummy. pathToAliases = new ArrayList<>(pathToAliases); @@ -3228,7 +3227,7 @@ public final class Utilities { boolean isEmptyTable = true; boolean hasLogged = false; - for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) { + for (Map.Entry<Path, List<String>> e : pathToAliases) { if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } @@ -3447,8 +3446,8 @@ public final class Utilities { // update the work - LinkedHashMap<Path, ArrayList<String>> pathToAliases = work.getPathToAliases(); - ArrayList<String> newList = new ArrayList<String>(1); + Map<Path, List<String>> pathToAliases = work.getPathToAliases(); + List<String> newList = new ArrayList<String>(1); newList.add(alias); pathToAliases.put(newPath, newList); @@ -3510,7 +3509,7 @@ public final class Utilities { public static void createTmpDirs(Configuration conf, MapWork mWork) throws IOException { - Map<Path, ArrayList<String>> pa = mWork.getPathToAliases(); + Map<Path, List<String>> pa = mWork.getPathToAliases(); if (MapUtils.isNotEmpty(pa)) { // common case: 1 table scan per map-work // rare case: smb joins diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f06ac37..3278dfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -261,7 +261,7 @@ public class DagUtils { Set<URI> fileSinkUris = new HashSet<URI>(); List<Node> topNodes = new ArrayList<Node>(); - LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); for (Operator<? extends OperatorDesc> operator : aliasToWork.values()) { topNodes.add(operator); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 5a903d3..308de1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -28,11 +28,9 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -46,7 +44,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -561,7 +558,7 @@ public class VectorMapOperator extends AbstractMapOperator { HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap = new HashMap<PartitionDesc, VectorPartitionContext>(); - for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) { + for (Map.Entry<Path, List<String>> entry : conf.getPathToAliases().entrySet()) { Path path = entry.getKey(); PartitionDesc partDesc = conf.getPathToPartitionInfo().get(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 5f2539f..1844ce0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -334,7 +334,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException { init(job); - Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); + Map<Path, List<String>> pathToAliases = mrwork.getPathToAliases(); Map<String, Operator<? extends OperatorDesc>> aliasToWork = mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() @@ -608,11 +608,11 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ * @return the sampled splits */ private List<CombineFileSplit> sampleSplits(List<CombineFileSplit> splits) { - HashMap<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample(); + Map<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample(); List<CombineFileSplit> retLists = new ArrayList<CombineFileSplit>(); Map<String, ArrayList<CombineFileSplit>> aliasToSplitList = new HashMap<String, ArrayList<CombineFileSplit>>(); - Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); - Map<Path, ArrayList<String>> pathToAliasesNoScheme = removeScheme(pathToAliases); + Map<Path, List<String>> pathToAliases = mrwork.getPathToAliases(); + Map<Path, List<String>> pathToAliasesNoScheme = removeScheme(pathToAliases); // Populate list of exclusive splits for every sampled alias // @@ -681,9 +681,9 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ return retLists; } - Map<Path, ArrayList<String>> removeScheme(Map<Path, ArrayList<String>> pathToAliases) { - Map<Path, ArrayList<String>> result = new HashMap<>(); - for (Map.Entry <Path, ArrayList<String>> entry : pathToAliases.entrySet()) { + Map<Path, List<String>> removeScheme(Map<Path, List<String>> pathToAliases) { + Map<Path, List<String>> result = new HashMap<>(); + for (Map.Entry <Path, List<String>> entry : pathToAliases.entrySet()) { Path newKey = Path.getPathWithoutSchemeAndAuthority(entry.getKey()); StringInternUtils.internUriStringsInPath(newKey); result.put(newKey, entry.getValue()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java index 07824c0..0d2eb0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -53,7 +52,7 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri extends HiveContextAwareRecordReader<K, V> { private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class); - private LinkedHashMap<Path, PartitionDesc> pathToPartInfo; + private Map<Path, PartitionDesc> pathToPartInfo; public CombineHiveRecordReader(InputSplit split, Configuration conf, Reporter reporter, Integer partition, RecordReader preReader) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index f75ed5d..8980a62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -23,13 +23,11 @@ import java.nio.file.FileSystemNotFoundException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -66,7 +64,6 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -420,8 +417,7 @@ public final class HiveFileFormatUtils { return pathToPartitionInfo.get(path); } - private static boolean foundAlias(Map<Path, ArrayList<String>> pathToAliases, - Path path) { + private static boolean foundAlias(Map<Path, List<String>> pathToAliases, Path path) { List<String> aliases = pathToAliases.get(path); if ((aliases == null) || (aliases.isEmpty())) { return false; @@ -429,8 +425,7 @@ public final class HiveFileFormatUtils { return true; } - private static Path getMatchingPath(Map<Path, ArrayList<String>> pathToAliases, - Path dir) { + private static Path getMatchingPath(Map<Path, List<String>> pathToAliases, Path dir) { // First find the path to be searched Path path = dir; if (foundAlias(pathToAliases, path)) { @@ -462,11 +457,9 @@ public final class HiveFileFormatUtils { * @param aliasToWork The operator tree to be invoked for a given alias * @param dir The path to look for **/ - public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath( - Map<Path, ArrayList<String>> pathToAliases, - Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) { - List<Operator<? extends OperatorDesc>> opList = - new ArrayList<Operator<? extends OperatorDesc>>(); + public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath(Map<Path, List<String>> pathToAliases, + Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) { + List<Operator<? extends OperatorDesc>> opList = new ArrayList<Operator<? extends OperatorDesc>>(); List<String> aliases = doGetAliasesFromPath(pathToAliases, dir); for (String alias : aliases) { @@ -480,9 +473,7 @@ public final class HiveFileFormatUtils { * @param pathToAliases mapping from path to aliases * @param dir The path to look for **/ - public static List<String> doGetAliasesFromPath( - Map<Path, ArrayList<String>> pathToAliases, - Path dir) { + public static List<String> doGetAliasesFromPath(Map<Path, List<String>> pathToAliases, Path dir) { if (pathToAliases == null) { return new ArrayList<String>(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 4bd4a24..cff7e04 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -878,13 +878,12 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } ArrayList<String> aliases = new ArrayList<String>(); - Iterator<Entry<Path, ArrayList<String>>> iterator = this.mrwork - .getPathToAliases().entrySet().iterator(); + Iterator<Entry<Path, List<String>>> iterator = this.mrwork.getPathToAliases().entrySet().iterator(); Set<Path> splitParentPaths = null; int pathsSize = this.mrwork.getPathToAliases().entrySet().size(); while (iterator.hasNext()) { - Entry<Path, ArrayList<String>> entry = iterator.next(); + Entry<Path, List<String>> entry = iterator.next(); Path key = entry.getKey(); boolean match; if (nonNative) { @@ -914,7 +913,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } if (match) { - ArrayList<String> list = entry.getValue(); + List<String> list = entry.getValue(); for (String val : list) { aliases.add(val); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java index 26f7733..30957ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java @@ -43,7 +43,7 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { Map<Path, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo(); List<Path> toRemovePaths = new ArrayList<>(); Map<Path, PartitionDesc> toAddPathToPart = new HashMap<>(); - Map<Path, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases(); + Map<Path, List<String>> pathToAliases = work.getMapWork().getPathToAliases(); for (Map.Entry<Path, PartitionDesc> pathPartEntry : pathToParts.entrySet()) { Path path = pathPartEntry.getKey(); @@ -62,7 +62,7 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { symlinks = fileSystem.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); } toRemovePaths.add(path); - ArrayList<String> aliases = pathToAliases.remove(path); + List<String> aliases = pathToAliases.remove(path); for (FileStatus symlink : symlinks) { BufferedReader reader = null; try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java index 3044603..594289e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java @@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.InputFormat; import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; @Explain(displayName = "Merge File Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -132,7 +131,7 @@ public class MergeFileWork extends MapWork { public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, TableDesc tblDesc, - ArrayList<String> aliases, + List<String> aliases, PartitionDesc partDesc) { super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, aliases, partDesc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 0444562..6d525ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -18,11 +18,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.common.StringInternUtils; @@ -87,7 +85,7 @@ public class ProjectionPusher { final Set<String> aliases = new HashSet<String>(); try { - ArrayList<String> a = HiveFileFormatUtils.getFromPathRecursively( + List<String> a = HiveFileFormatUtils.getFromPathRecursively( mapWork.getPathToAliases(), new Path(splitPath), null, false, true); if (a != null) { aliases.addAll(a); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 3277765..5d6143d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -117,7 +117,6 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; @@ -862,8 +861,7 @@ public final class GenMapRedUtils { } else if (task instanceof ExecDriver) { MapredWork work = (MapredWork) task.getWork(); work.getMapWork().deriveExplainAttributes(); - HashMap<String, Operator<? extends OperatorDesc>> opMap = work - .getMapWork().getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> opMap = work.getMapWork().getAliasToWork(); if (opMap != null && !opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setKeyAndValueDesc(work.getReduceWork(), op); @@ -985,7 +983,7 @@ public final class GenMapRedUtils { conf.getBoolVar( HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); work.setMapperCannotSpanPartns(mapperCannotSpanPartns); - work.setPathToAliases(new LinkedHashMap<Path, ArrayList<String>>()); + work.setPathToAliases(new LinkedHashMap<Path, List<String>>()); work.setPathToPartitionInfo(new LinkedHashMap<Path, PartitionDesc>()); work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>()); return mrWork; @@ -1164,13 +1162,13 @@ public final class GenMapRedUtils { */ public static void replaceMapWork(String sourceAlias, String targetAlias, MapWork source, MapWork target) { - Map<Path, ArrayList<String>> sourcePathToAliases = source.getPathToAliases(); + Map<Path, List<String>> sourcePathToAliases = source.getPathToAliases(); Map<Path, PartitionDesc> sourcePathToPartitionInfo = source.getPathToPartitionInfo(); Map<String, Operator<? extends OperatorDesc>> sourceAliasToWork = source.getAliasToWork(); Map<String, PartitionDesc> sourceAliasToPartnInfo = source.getAliasToPartnInfo(); - LinkedHashMap<Path, ArrayList<String>> targetPathToAliases = target.getPathToAliases(); - LinkedHashMap<Path, PartitionDesc> targetPathToPartitionInfo = target.getPathToPartitionInfo(); + Map<Path, List<String>> targetPathToAliases = target.getPathToAliases(); + Map<Path, PartitionDesc> targetPathToPartitionInfo = target.getPathToPartitionInfo(); Map<String, Operator<? extends OperatorDesc>> targetAliasToWork = target.getAliasToWork(); Map<String, PartitionDesc> targetAliasToPartnInfo = target.getAliasToPartnInfo(); @@ -1192,8 +1190,8 @@ public final class GenMapRedUtils { targetAliasToWork.remove(targetAlias); targetAliasToPartnInfo.remove(targetAlias); List<Path> pathsToRemove = new ArrayList<>(); - for (Entry<Path, ArrayList<String>> entry: targetPathToAliases.entrySet()) { - ArrayList<String> aliases = entry.getValue(); + for (Entry<Path, List<String>> entry: targetPathToAliases.entrySet()) { + List<String> aliases = entry.getValue(); aliases.remove(targetAlias); if (aliases.isEmpty()) { pathsToRemove.add(entry.getKey()); @@ -1209,8 +1207,8 @@ public final class GenMapRedUtils { targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo); targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo); List<Path> pathsToAdd = new ArrayList<>(); - for (Entry<Path, ArrayList<String>> entry: sourcePathToAliases.entrySet()) { - ArrayList<String> aliases = entry.getValue(); + for (Entry<Path, List<String>> entry: sourcePathToAliases.entrySet()) { + List<String> aliases = entry.getValue(); if (aliases.contains(sourceAlias)) { pathsToAdd.add(entry.getKey()); } @@ -1653,7 +1651,7 @@ public final class GenMapRedUtils { // create the merge file work MergeFileWork work = new MergeFileWork(inputDirs, finalName, hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName(), tblDesc); - LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); + Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(inputDir, inputDirstr); work.setMapperCannotSpanPartns(true); work.setPathToAliases(pathToAliases); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 1256e1c..5ed43c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -147,14 +147,14 @@ public class MapJoinProcessor extends Transform { smallTableAliasList.add(alias); // get input path and remove this alias from pathToAlias // because this file will be fetched by fetch operator - LinkedHashMap<Path, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases(); + Map<Path, List<String>> pathToAliases = newWork.getMapWork().getPathToAliases(); // keep record all the input path for this alias HashSet<Path> pathSet = new HashSet<>(); HashSet<Path> emptyPath = new HashSet<>(); - for (Map.Entry<Path, ArrayList<String>> entry2 : pathToAliases.entrySet()) { + for (Map.Entry<Path, List<String>> entry2 : pathToAliases.entrySet()) { Path path = entry2.getKey(); - ArrayList<String> list = entry2.getValue(); + List<String> list = entry2.getValue(); if (list.contains(alias)) { // add to path set pathSet.add(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 0b5de81..4ac2567 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Stack; @@ -119,8 +117,8 @@ public abstract class AbstractJoinTaskDispatcher implements Dispatcher { } public long getTotalKnownInputSize(Context context, MapWork currWork, - Map<Path, ArrayList<String>> pathToAliases, - HashMap<String, Long> aliasToSize) throws SemanticException { + Map<Path, List<String>> pathToAliases, + Map<String, Long> aliasToSize) throws SemanticException { try { // go over all the input paths, and calculate a known total size, known // size for each input alias. @@ -130,7 +128,7 @@ public abstract class AbstractJoinTaskDispatcher implements Dispatcher { // is chosen as big table, what's the total size of left tables, which // are going to be small tables. long aliasTotalKnownInputSize = 0L; - for (Map.Entry<Path, ArrayList<String>> entry : pathToAliases.entrySet()) { + for (Map.Entry<Path, List<String>> entry : pathToAliases.entrySet()) { Path path = entry.getKey(); List<String> aliasList = entry.getValue(); ContentSummary cs = context.getCS(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index e564daf..0d9d5e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -241,7 +241,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme String childMRAlias = childMRAliases.get(0); // Sanity check to make sure there is no alias conflict after merge. - for (Entry<Path, ArrayList<String>> entry : childMapWork.getPathToAliases().entrySet()) { + for (Entry<Path, List<String>> entry : childMapWork.getPathToAliases().entrySet()) { Path path = entry.getKey(); List<String> aliases = entry.getValue(); @@ -392,7 +392,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme // Must be deterministic order map for consistent q-test output across Java versions HashMap<Task<? extends Serializable>, Set<String>> taskToAliases = new LinkedHashMap<Task<? extends Serializable>, Set<String>>(); - HashMap<Path, ArrayList<String>> pathToAliases = currWork.getPathToAliases(); + Map<Path, List<String>> pathToAliases = currWork.getPathToAliases(); Map<String, Operator<? extends OperatorDesc>> aliasToWork = currWork.getAliasToWork(); // start to generate multiple map join tasks diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java index ec9813d..b7dd90d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java @@ -148,14 +148,14 @@ public class NullScanTaskDispatcher implements Dispatcher { tso.getConf().setIsMetadataOnly(true); } // group path alias according to work - Map<Path, ArrayList<String>> candidates = new HashMap<>(); + Map<Path, List<String>> candidates = new HashMap<>(); for (Path path : work.getPaths()) { - ArrayList<String> aliasesAffected = work.getPathToAliases().get(path); + List<String> aliasesAffected = work.getPathToAliases().get(path); if (CollectionUtils.isNotEmpty(aliasesAffected)) { candidates.put(path, aliasesAffected); } } - for (Entry<Path, ArrayList<String>> entry : candidates.entrySet()) { + for (Entry<Path, List<String>> entry : candidates.entrySet()) { processAlias(work, entry.getKey(), entry.getValue(), aliases); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index af3edf8..ebf1708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; -import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -77,7 +76,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl Map<String, PartitionDesc> aliasToPartitionInfo = currWork.getAliasToPartnInfo(); List<Path> removePaths = new ArrayList<>(); - for (Map.Entry<Path, ArrayList<String>> entry : currWork.getPathToAliases().entrySet()) { + for (Map.Entry<Path, List<String>> entry : currWork.getPathToAliases().entrySet()) { boolean keepPath = false; for (String alias : entry.getValue()) { if (aliasToPartitionInfo.containsKey(alias)) { @@ -260,7 +259,7 @@ public class SortMergeJoinTaskDispatcher extends AbstractJoinTaskDispatcher impl HashMap<Task<? extends Serializable>, Set<String>> taskToAliases = new LinkedHashMap<Task<? extends Serializable>, Set<String>>(); // Note that pathToAlias will behave as if the original plan was a join plan - HashMap<Path, ArrayList<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); + Map<Path, List<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); // generate a map join task for the big table SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 1cf44b3..cbb3df0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1207,7 +1207,7 @@ public class Vectorizer implements PhysicalPlanResolver { // Eliminate MR plans with more than one TableScanOperator. - LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { setNodeIssue("Vectorized map work requires work"); return null; @@ -1714,8 +1714,8 @@ public class Vectorizer implements PhysicalPlanResolver { List<String> tableDataColumnList = null; List<TypeInfo> tableDataTypeInfoList = null; - LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases(); - LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + Map<Path, List<String>> pathToAliases = mapWork.getPathToAliases(); + Map<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); // Remember the input file formats we validated and why. Set<String> inputFileFormatClassNameSet = new HashSet<String>(); @@ -1726,7 +1726,7 @@ public class Vectorizer implements PhysicalPlanResolver { Set<Support> inputFormatSupportSet = new TreeSet<Support>(); boolean outsideLoopIsFirstPartition = true; - for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) { + for (Entry<Path, List<String>> entry: pathToAliases.entrySet()) { final boolean isFirstPartition = outsideLoopIsFirstPartition; outsideLoopIsFirstPartition = false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 698d7fe..ff7f9a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1995,10 +1995,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { private void analyzeAlterTablePartMergeFiles(ASTNode ast, String tableName, HashMap<String, String> partSpec) throws SemanticException { - AlterTableConcatenateDesc mergeDesc = new AlterTableConcatenateDesc( - tableName, partSpec); - List<Path> inputDir = new ArrayList<Path>(); Path oldTblPartLoc = null; Path newTblPartLoc = null; Table tblObj = null; @@ -2019,7 +2016,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); return; } - mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj)); List<String> bucketCols = null; Class<? extends InputFormat> inputFormatClass = null; @@ -2064,11 +2060,9 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } // throw a HiveException for other than rcfile and orcfile. - if (!((inputFormatClass.equals(RCFileInputFormat.class) || - (inputFormatClass.equals(OrcInputFormat.class))))) { + if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass.equals(OrcInputFormat.class))) { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_FILE_FORMAT.getMsg()); } - mergeDesc.setInputFormatClass(inputFormatClass); // throw a HiveException if the table/partition is bucketized if (bucketCols != null && bucketCols.size() > 0) { @@ -2090,19 +2084,14 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg()); } - inputDir.add(oldTblPartLoc); - - mergeDesc.setInputDir(inputDir); - - mergeDesc.setLbCtx(lbCtx); - addInputsOutputsAlterTable(tableName, partSpec, null, AlterTableType.MERGEFILES, false); + TableDesc tblDesc = Utilities.getTableDesc(tblObj); + Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); + AlterTableConcatenateDesc mergeDesc = new AlterTableConcatenateDesc(tableName, partSpec, lbCtx, oldTblPartLoc, + queryTmpdir, inputFormatClass, Utilities.getTableDesc(tblObj)); DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc); ddlWork.setNeedLock(true); Task<?> mergeTask = TaskFactory.get(ddlWork); - TableDesc tblDesc = Utilities.getTableDesc(tblObj); - Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); - mergeDesc.setOutputDir(queryTmpdir); // No need to handle MM tables - unsupported path. LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 41a3b00..42637df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.parse; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -97,7 +96,7 @@ public class MapReduceCompiler extends TaskCompiler { protected void setInputFormat(Task<? extends Serializable> task) { if (task instanceof ExecDriver) { MapWork work = ((MapredWork) task.getWork()).getMapWork(); - HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(work, op); @@ -141,8 +140,8 @@ public class MapReduceCompiler extends TaskCompiler { private void breakTaskTree(Task<? extends Serializable> task) { if (task instanceof ExecDriver) { - HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task - .getWork()).getMapWork().getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> opMap = + ((MapredWork) task.getWork()).getMapWork().getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { breakOperatorTree(op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 8c3ee0c..5000ba4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -697,7 +697,7 @@ public class TezCompiler extends TaskCompiler { for (BaseWork w: all) { if (w instanceof MapWork) { MapWork mapWork = (MapWork) w; - HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(mapWork, op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 5bf5502..6bc5925 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -530,7 +530,7 @@ public class SparkCompiler extends TaskCompiler { for (BaseWork w: all) { if (w instanceof MapWork) { MapWork mapWork = (MapWork) w; - HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); + Map<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends OperatorDesc> op : opMap.values()) { setInputFormat(mapWork, op); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index 7c1dc45..cc5baee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -21,11 +21,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +51,8 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria private static final long serialVersionUID = 1L; private HashMap<Task<? extends Serializable>, Set<String>> taskToAliases; - HashMap<Path, ArrayList<String>> pathToAliases; - HashMap<String, Long> aliasToKnownSize; + Map<Path, List<String>> pathToAliases; + Map<String, Long> aliasToKnownSize; private Task<? extends Serializable> commonJoinTask; private Path localTmpDir; @@ -79,7 +77,7 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria this.commonJoinTask = commonJoinTask; } - public HashMap<String, Long> getAliasToKnownSize() { + public Map<String, Long> getAliasToKnownSize() { return aliasToKnownSize == null ? aliasToKnownSize = new HashMap<String, Long>() : aliasToKnownSize; } @@ -88,11 +86,11 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria this.aliasToKnownSize = aliasToKnownSize; } - public HashMap<Path, ArrayList<String>> getPathToAliases() { + public Map<Path, List<String>> getPathToAliases() { return pathToAliases; } - public void setPathToAliases(final HashMap<Path, ArrayList<String>> pathToAliases) { + public void setPathToAliases(Map<Path, List<String>> pathToAliases) { this.pathToAliases = pathToAliases; } @@ -214,10 +212,10 @@ public class ConditionalResolverCommonJoin implements ConditionalResolver, Seria Set<String> aliases = getParticipants(ctx); Map<String, Long> aliasToKnownSize = ctx.getAliasToKnownSize(); - Map<Path, ArrayList<String>> pathToAliases = ctx.getPathToAliases(); + Map<Path, List<String>> pathToAliases = ctx.getPathToAliases(); Set<Path> unknownPaths = new HashSet<>(); - for (Map.Entry<Path, ArrayList<String>> entry : pathToAliases.entrySet()) { + for (Map.Entry<Path, List<String>> entry : pathToAliases.entrySet()) { for (String alias : entry.getValue()) { if (aliases.contains(alias) && !aliasToKnownSize.containsKey(alias)) { unknownPaths.add(entry.getKey()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 54c9659..a828809 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -248,10 +248,10 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, work.removePathToPartitionInfo(path); // the root path is not useful anymore // cleanup pathToAliases - LinkedHashMap<Path, ArrayList<String>> pta = work.getPathToAliases(); + Map<Path, List<String>> pta = work.getPathToAliases(); assert pta.size() == 1; path = pta.keySet().iterator().next(); - ArrayList<String> aliases = pta.get(path); + List<String> aliases = pta.get(path); work.removePathToAlias(path); // the root path is not useful anymore // populate pathToPartitionInfo and pathToAliases w/ DP paths diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index bb063c5..1d06435 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -94,15 +94,16 @@ public class MapWork extends BaseWork { // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing - private LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); + private Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); - private LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); + private Map<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); - private LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); + private Map<String, Operator<? extends OperatorDesc>> aliasToWork = + new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); - private LinkedHashMap<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>(); + private Map<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>(); - private HashMap<String, SplitSample> nameToSplitSample = new LinkedHashMap<String, SplitSample>(); + private Map<String, SplitSample> nameToSplitSample = new LinkedHashMap<String, SplitSample>(); // If this map task has a FileSinkOperator, and bucketing/sorting metadata can be // inferred about the data being written by that operator, these are mappings from the directory @@ -181,25 +182,25 @@ public class MapWork extends BaseWork { } @Explain(displayName = "Path -> Alias", explainLevels = { Level.EXTENDED }) - public LinkedHashMap<Path, ArrayList<String>> getPathToAliases() { + public Map<Path, List<String>> getPathToAliases() { // return pathToAliases; } - public void setPathToAliases(final LinkedHashMap<Path, ArrayList<String>> pathToAliases) { + public void setPathToAliases(Map<Path, List<String>> pathToAliases) { for (Path p : pathToAliases.keySet()) { StringInternUtils.internUriStringsInPath(p); } this.pathToAliases = pathToAliases; } - public void addPathToAlias(Path path, ArrayList<String> aliases){ + public void addPathToAlias(Path path, List<String> aliases){ StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } public void addPathToAlias(Path path, String newAlias){ - ArrayList<String> aliases = pathToAliases.get(path); + List<String> aliases = pathToAliases.get(path); if (aliases == null) { aliases = new ArrayList<>(1); StringInternUtils.internUriStringsInPath(path); @@ -225,26 +226,25 @@ public class MapWork extends BaseWork { * @return */ @Explain(displayName = "Truncated Path -> Alias", explainLevels = { Level.EXTENDED }) - public Map<String, ArrayList<String>> getTruncatedPathToAliases() { - Map<String, ArrayList<String>> trunPathToAliases = new LinkedHashMap<String, - ArrayList<String>>(); - Iterator<Entry<Path, ArrayList<String>>> itr = this.pathToAliases.entrySet().iterator(); + public Map<String, List<String>> getTruncatedPathToAliases() { + Map<String, List<String>> trunPathToAliases = new LinkedHashMap<String, List<String>>(); + Iterator<Entry<Path, List<String>>> itr = this.pathToAliases.entrySet().iterator(); while (itr.hasNext()) { - final Entry<Path, ArrayList<String>> entry = itr.next(); + Entry<Path, List<String>> entry = itr.next(); Path origiKey = entry.getKey(); String newKey = PlanUtils.removePrefixFromWarehouseConfig(origiKey.toString()); - ArrayList<String> value = entry.getValue(); + List<String> value = entry.getValue(); trunPathToAliases.put(newKey, value); } return trunPathToAliases; } @Explain(displayName = "Path -> Partition", explainLevels = { Level.EXTENDED }) - public LinkedHashMap<Path, PartitionDesc> getPathToPartitionInfo() { + public Map<Path, PartitionDesc> getPathToPartitionInfo() { return pathToPartitionInfo; } - public void setPathToPartitionInfo(final LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo) { + public void setPathToPartitionInfo(final Map<Path, PartitionDesc> pathToPartitionInfo) { for (Path p : pathToPartitionInfo.keySet()) { StringInternUtils.internUriStringsInPath(p); } @@ -364,7 +364,7 @@ public class MapWork extends BaseWork { /** * @return the aliasToPartnInfo */ - public LinkedHashMap<String, PartitionDesc> getAliasToPartnInfo() { + public Map<String, PartitionDesc> getAliasToPartnInfo() { return aliasToPartnInfo; } @@ -377,17 +377,16 @@ public class MapWork extends BaseWork { this.aliasToPartnInfo = aliasToPartnInfo; } - public LinkedHashMap<String, Operator<? extends OperatorDesc>> getAliasToWork() { + public Map<String, Operator<? extends OperatorDesc>> getAliasToWork() { return aliasToWork; } - public void setAliasToWork( - final LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork) { + public void setAliasToWork(Map<String, Operator<? extends OperatorDesc>> aliasToWork) { this.aliasToWork = aliasToWork; } @Explain(displayName = "Split Sample", explainLevels = { Level.EXTENDED }) - public HashMap<String, SplitSample> getNameToSplitSample() { + public Map<String, SplitSample> getNameToSplitSample() { return nameToSplitSample; } @@ -417,7 +416,7 @@ public class MapWork extends BaseWork { public void addMapWork(Path path, String alias, Operator<?> work, PartitionDesc pd) { StringInternUtils.internUriStringsInPath(path); - ArrayList<String> curAliases = pathToAliases.get(path); + List<String> curAliases = pathToAliases.get(path); if (curAliases == null) { assert (pathToPartitionInfo.get(path) == null); curAliases = new ArrayList<>(); @@ -450,7 +449,7 @@ public class MapWork extends BaseWork { } public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, - TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) { + TableDesc tblDesc, List<String> aliases, PartitionDesc partDesc) { StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); pathToPartitionInfo.put(path, partDesc); @@ -522,7 +521,7 @@ public class MapWork extends BaseWork { public void mergeAliasedInput(String alias, Path pathDir, PartitionDesc partitionInfo) { StringInternUtils.internUriStringsInPath(pathDir); alias = alias.intern(); - ArrayList<String> aliases = pathToAliases.get(pathDir); + List<String> aliases = pathToAliases.get(pathDir); if (aliases == null) { aliases = new ArrayList<>(Arrays.asList(alias)); pathToAliases.put(pathDir, aliases); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java index a946b4f..5f55ceb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java @@ -26,12 +26,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -382,10 +382,8 @@ public class TestGetInputSummary { context.addCS(partitionPath.toString(), entry.getValue()); } - LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = - new LinkedHashMap<>(); - LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = - new LinkedHashMap<>(); + Map<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>(); + Map<Path, List<String>> pathToAliasTable = new LinkedHashMap<>(); TableScanOperator scanOp = new TableScanOperator(); PartitionDesc partitionDesc = new PartitionDesc( diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index c7cd4ad..a363b22 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -299,10 +299,10 @@ public class TestOperators extends TestCase { new Path("hdfs:///testDir/testFile")); // initialize pathToAliases - ArrayList<String> aliases = new ArrayList<String>(); + List<String> aliases = new ArrayList<String>(); aliases.add("a"); aliases.add("b"); - LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); + Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(new Path("hdfs:///testDir"), aliases); // initialize pathToTableInfo diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java index 3aaf561..d8e4347 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java @@ -22,6 +22,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -64,7 +66,7 @@ public class TestPlan extends TestCase { ArrayList<String> aliasList = new ArrayList<String>(); aliasList.add("a"); - LinkedHashMap<Path, ArrayList<String>> pa = new LinkedHashMap<>(); + Map<Path, List<String>> pa = new LinkedHashMap<>(); pa.put(new Path("/tmp/testfolder"), aliasList); TableDesc tblDesc = Utilities.defaultTd; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 2d48449..fbf948c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -366,7 +366,7 @@ public class TestUtilities { assertEquals(mapWork.getPathToPartitionInfo().size(), numPartitions); assertEquals(mapWork.getAliasToWork().size(), numPartitions); - for (Map.Entry<Path, ArrayList<String>> entry : mapWork.getPathToAliases().entrySet()) { + for (Map.Entry<Path, List<String>> entry : mapWork.getPathToAliases().entrySet()) { assertNotNull(entry.getKey()); assertNotNull(entry.getValue()); assertEquals(entry.getValue().size(), 1); @@ -485,7 +485,7 @@ public class TestUtilities { MapWork mapWork = new MapWork(); Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)); - LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>(); + Map<Path, List<String>> pathToAliasTable = new LinkedHashMap<>(); String testTableName = "testTable"; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index b67aec3..befeb4f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -133,14 +133,14 @@ public class TestTezTask { op = mock(Operator.class); - LinkedHashMap<String, Operator<? extends OperatorDesc>> map + Map<String, Operator<? extends OperatorDesc>> map = new LinkedHashMap<String,Operator<? extends OperatorDesc>>(); map.put("foo", op); mws[0].setAliasToWork(map); mws[1].setAliasToWork(map); - LinkedHashMap<Path, ArrayList<String>> pathMap = new LinkedHashMap<>(); - ArrayList<String> aliasList = new ArrayList<String>(); + Map<Path, List<String>> pathMap = new LinkedHashMap<>(); + List<String> aliasList = new ArrayList<String>(); aliasList.add("foo"); pathMap.put(new Path("foo"), aliasList); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 9a8ae3b..b5958fa 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2201,8 +2201,8 @@ public class TestInputOutputFormat { mapWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); } mapWork.setUseBucketizedHiveInputFormat(false); - LinkedHashMap<Path, ArrayList<String>> aliasMap = new LinkedHashMap<>(); - ArrayList<String> aliases = new ArrayList<String>(); + Map<Path, List<String>> aliasMap = new LinkedHashMap<>(); + List<String> aliases = new ArrayList<String>(); aliases.add(tableName); LinkedHashMap<Path, PartitionDesc> partMap = new LinkedHashMap<>(); for(int p=0; p < partitions; ++p) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java index 3a8b5e7..3fc82ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java @@ -32,6 +32,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Set; public class TestConditionalResolverCommonJoin { @@ -40,7 +42,7 @@ public class TestConditionalResolverCommonJoin { public void testResolvingDriverAlias() throws Exception { ConditionalResolverCommonJoin resolver = new ConditionalResolverCommonJoin(); - HashMap<Path, ArrayList<String>> pathToAliases = new HashMap<>(); + Map<Path, List<String>> pathToAliases = new HashMap<>(); pathToAliases.put(new Path("path1"), new ArrayList<String>(Arrays.asList("alias1", "alias2"))); pathToAliases.put(new Path("path2"), new ArrayList<String>(Arrays.asList("alias3"))); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java index 1756711..3e0d834 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.plan; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.junit.Test; @@ -31,11 +32,11 @@ public class TestMapWork { @Test public void testGetAndSetConsistency() { MapWork mw = new MapWork(); - LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); + Map<Path, List<String>> pathToAliases = new LinkedHashMap<>(); pathToAliases.put(new Path("p0"), Lists.newArrayList("a1", "a2")); mw.setPathToAliases(pathToAliases); - LinkedHashMap<Path, ArrayList<String>> pta = mw.getPathToAliases(); + Map<Path, List<String>> pta = mw.getPathToAliases(); assertEquals(pathToAliases, pta); } diff --git a/ql/src/test/results/clientpositive/llap/orc_merge10.q.out b/ql/src/test/results/clientpositive/llap/orc_merge10.q.out index 3af8190..7b69b39 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge10.q.out @@ -632,7 +632,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: ds 1 part 0 table name: default.orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge6.q.out b/ql/src/test/results/clientpositive/llap/orc_merge6.q.out index 95cbe1e..7021220 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge6.q.out @@ -522,7 +522,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge7.q.out b/ql/src/test/results/clientpositive/llap/orc_merge7.q.out index 4155dc6..1f7b7b7 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge7.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge7.q.out @@ -653,7 +653,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: st 80.0 table name: default.orc_merge5a_n0 diff --git a/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out index 48217cd..207541b 100644 --- a/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_merge_incompat2.q.out @@ -338,7 +338,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: st 80.0 table name: default.orc_merge5a diff --git a/ql/src/test/results/clientpositive/orc_merge10.q.out b/ql/src/test/results/clientpositive/orc_merge10.q.out index e8ebd4e..1f70773 100644 --- a/ql/src/test/results/clientpositive/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/orc_merge10.q.out @@ -603,7 +603,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: ds 1 part 0 table name: default.orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/orc_merge6.q.out b/ql/src/test/results/clientpositive/orc_merge6.q.out index a07cfd3..bc05d2f 100644 --- a/ql/src/test/results/clientpositive/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/orc_merge6.q.out @@ -486,7 +486,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out index 71ae287..177bc25 100644 --- a/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/orc_merge_incompat2.q.out @@ -333,7 +333,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: st 80.0 table name: default.orc_merge5a diff --git a/ql/src/test/results/clientpositive/spark/orc_merge6.q.out b/ql/src/test/results/clientpositive/spark/orc_merge6.q.out index 982b614..b3d1ca4 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge6.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge6.q.out @@ -422,7 +422,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: hour 24 year 2000 table name: default.orc_merge5a_n1 diff --git a/ql/src/test/results/clientpositive/spark/orc_merge7.q.out b/ql/src/test/results/clientpositive/spark/orc_merge7.q.out index a641ed7..0c2b8a0 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge7.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge7.q.out @@ -553,7 +553,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: st 80.0 table name: default.orc_merge5a_n0 diff --git a/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out b/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out index 1bfbf4d..8da08bf 100644 --- a/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out +++ b/ql/src/test/results/clientpositive/spark/orc_merge_incompat2.q.out @@ -294,7 +294,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Concatenate - partition desc: + partition spec: st 80.0 table name: default.orc_merge5a diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java index c2ba3b0..d75d709 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java @@ -108,15 +108,11 @@ public class Msck { boolean success = false; long txnId = -1; int ret = 0; + long partitionExpirySeconds = msckInfo.getPartitionExpirySeconds(); try { Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName()); qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); - if (getConf().getBoolean(MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false)) { - msckInfo.setPartitionExpirySeconds(PartitionManagementTask.getRetentionPeriodInSeconds(table)); - LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", - qualifiedTableName, msckInfo.getPartitionExpirySeconds()); - } - HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), msckInfo.getPartitionExpirySeconds()); + HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), partitionExpirySeconds); // checkMetastore call will fill in result with partitions that are present in filesystem // and missing in metastore - accessed through getPartitionsNotInMs // And partitions that are not present in filesystem and metadata exists in metastore - @@ -253,7 +249,7 @@ public class Msck { firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), "Partitions missing from filesystem:", resultOut, firstWritten); firstWritten |= writeMsckResult(result.getExpiredPartitions(), - "Expired partitions (retention period: " + msckInfo.getPartitionExpirySeconds() + "s) :", resultOut, firstWritten); + "Expired partitions (retention period: " + partitionExpirySeconds + "s) :", resultOut, firstWritten); // sorting to stabilize qfile output (msck_repair_drop.q) Collections.sort(repairOutput); for (String rout : repairOutput) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java index 81bcb56..25d0c64 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java @@ -17,29 +17,27 @@ */ package org.apache.hadoop.hive.metastore; -import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; /** * Metadata related to Msck. */ public class MsckInfo { - private String catalogName; - private String dbName; - private String tableName; - private ArrayList<LinkedHashMap<String, String>> partSpecs; - private String resFile; - private boolean repairPartitions; - private boolean addPartitions; - private boolean dropPartitions; - private long partitionExpirySeconds; - - public MsckInfo(final String catalogName, final String dbName, final String tableName, - final ArrayList<LinkedHashMap<String, String>> partSpecs, final String resFile, final boolean repairPartitions, - final boolean addPartitions, - final boolean dropPartitions, - final long partitionExpirySeconds) { + private final String catalogName; + private final String dbName; + private final String tableName; + private final List<Map<String, String>> partSpecs; + private final String resFile; + private final boolean repairPartitions; + private final boolean addPartitions; + private final boolean dropPartitions; + private final long partitionExpirySeconds; + + public MsckInfo(String catalogName, String dbName, String tableName, List<Map<String, String>> partSpecs, + String resFile, boolean repairPartitions, boolean addPartitions, boolean dropPartitions, + long partitionExpirySeconds) { this.catalogName = catalogName; this.dbName = dbName; this.tableName = tableName; @@ -55,71 +53,35 @@ public class MsckInfo { return catalogName; } - public void setCatalogName(final String catalogName) { - this.catalogName = catalogName; - } - public String getDbName() { return dbName; } - public void setDbName(final String dbName) { - this.dbName = dbName; - } - public String getTableName() { return tableName; } - public void setTableName(final String tableName) { - this.tableName = tableName; - } - - public ArrayList<LinkedHashMap<String, String>> getPartSpecs() { + public List<Map<String, String>> getPartSpecs() { return partSpecs; } - public void setPartSpecs(final ArrayList<LinkedHashMap<String, String>> partSpecs) { - this.partSpecs = partSpecs; - } - public String getResFile() { return resFile; } - public void setResFile(final String resFile) { - this.resFile = resFile; - } - public boolean isRepairPartitions() { return repairPartitions; } - public void setRepairPartitions(final boolean repairPartitions) { - this.repairPartitions = repairPartitions; - } - public boolean isAddPartitions() { return addPartitions; } - public void setAddPartitions(final boolean addPartitions) { - this.addPartitions = addPartitions; - } - public boolean isDropPartitions() { return dropPartitions; } - public void setDropPartitions(final boolean dropPartitions) { - this.dropPartitions = dropPartitions; - } - public long getPartitionExpirySeconds() { return partitionExpirySeconds; } - - public void setPartitionExpirySeconds(final long partitionExpirySeconds) { - this.partitionExpirySeconds = partitionExpirySeconds; - } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 59001b5..da0259c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -163,7 +163,7 @@ public class PartitionManagementTask implements MetastoreTaskThread { } } - static long getRetentionPeriodInSeconds(final Table table) { + public static long getRetentionPeriodInSeconds(final Table table) { String retentionPeriod; long retentionSeconds = -1; if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
