Author: gunther
Date: Fri Sep 5 20:16:08 2014
New Revision: 1622783
URL: http://svn.apache.org/r1622783
Log:
HIVE-7976: Merge tez branch into trunk (tez 0.5.0) (Gopal V via Gunther
Hagleitner)
Added:
hive/trunk/data/files/agg_01-p1.txt
- copied unchanged from r1622766,
hive/branches/tez/data/files/agg_01-p1.txt
hive/trunk/data/files/agg_01-p2.txt
- copied unchanged from r1622766,
hive/branches/tez/data/files/agg_01-p2.txt
hive/trunk/data/files/agg_01-p3.txt
- copied unchanged from r1622766,
hive/branches/tez/data/files/agg_01-p3.txt
hive/trunk/data/files/dim_shops.txt
- copied unchanged from r1622766,
hive/branches/tez/data/files/dim_shops.txt
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
- copied unchanged from r1622766,
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
hive/trunk/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
hive/trunk/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
hive/trunk/ql/src/test/queries/clientpositive/tez_bmj_schema_evolution.q
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/queries/clientpositive/tez_bmj_schema_evolution.q
hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/tez_bmj_schema_evolution.q.out
- copied unchanged from r1622766,
hive/branches/tez/ql/src/test/results/clientpositive/tez/tez_bmj_schema_evolution.q.out
Modified:
hive/trunk/ (props changed)
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/hbase-handler/pom.xml (props changed)
hive/trunk/itests/src/test/resources/testconfiguration.properties
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/pom.xml
hive/trunk/ql/if/queryplan.thrift
hive/trunk/ql/pom.xml
hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
hive/trunk/ql/src/gen/thrift/gen-php/Types.php
hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
hive/trunk/ql/src/test/results/clientpositive/optimize_nullscan.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/bucket2.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/bucket3.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/bucket4.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/optimize_nullscan.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/select_dummy_source.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/temp_table.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/vector_string_concat.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/vectorization_short_regress.q.out
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
Propchange: hive/trunk/
------------------------------------------------------------------------------
Merged /hive/branches/tez:r1573250-1622766
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri
Sep 5 20:16:08 2014
@@ -374,7 +374,6 @@ public class HiveConf extends Configurat
METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL",
"jdbc:derby:;databaseName=metastore_db;create=true",
"JDBC connect string for a JDBC metastore"),
-
HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1,
"The number of times to retry a HMSHandler call if there were a
connection error."),
HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", "1000ms",
@@ -1782,7 +1781,15 @@ public class HiveConf extends Configurat
"When auto reducer parallelism is enabled this factor will be used to
over-partition data in shuffle edges."),
TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
"When auto reducer parallelism is enabled this factor will be used to
put a lower limit to the number\n" +
- "of reducers that tez specifies.")
+ "of reducers that tez specifies."),
+ TEZ_DYNAMIC_PARTITION_PRUNING(
+ "hive.tez.dynamic.partition.pruning", true,
+ "When dynamic pruning is enabled, joins on partition keys will be
processed by sending events from the processing " +
+ "vertices to the tez application master. These events will be used to
prune unnecessary partitions."),
+
TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE("hive.tez.dynamic.partition.pruning.max.event.size",
1*1024*1024L,
+ "Maximum size of events sent by processors in dynamic pruning. If this
size is crossed no pruning will take place."),
+
TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.parition.pruning.max.data.size",
100*1024*1024L,
+ "Maximum total data size of events in dynamic pruning.")
;
public final String varname;
Propchange: hive/trunk/hbase-handler/pom.xml
------------------------------------------------------------------------------
Merged /hive/branches/tez/hbase-handler/pom.xml:r1573250-1622766
Merged /hive/trunk/hbase-handler/pom.xml:r1494760-1537575
Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL:
http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Fri Sep
5 20:16:08 2014
@@ -140,8 +140,11 @@ minitez.query.files.shared=alter_merge_2
minitez.query.files=bucket_map_join_tez1.q,\
bucket_map_join_tez2.q,\
+ dynamic_partition_pruning.q,\
+ dynamic_partition_pruning_2.q,\
mapjoin_decimal.q,\
mrr.q,\
+ tez_bmj_schema_evolution.q,\
tez_dml.q,\
tez_fsstat.q,\
tez_insert_overwrite_local_directory_1.q,\
Modified:
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL:
http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
(original)
+++
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
Fri Sep 5 20:16:08 2014
@@ -43,7 +43,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -64,10 +63,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.common.io.DigestPrintStream;
import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
import org.apache.hadoop.hive.common.io.SortPrintStream;
-import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -75,8 +74,6 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -87,22 +84,14 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.util.StreamPrinter;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.tools.ant.BuildException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assume;
import com.google.common.collect.ImmutableList;
@@ -145,8 +134,8 @@ public class QTestUtil {
private QTestSetup setup = null;
private boolean isSessionStateStarted = false;
- private String initScript;
- private String cleanupScript;
+ private final String initScript;
+ private final String cleanupScript;
static {
for (String srcTable : System.getProperty("test.src.tables",
"").trim().split(",")) {
@@ -332,14 +321,6 @@ public class QTestUtil {
HadoopShims shims = ShimLoader.getHadoopShims();
int numberOfDataNodes = 4;
- // can run tez tests only on hadoop 2
- if (clusterType == MiniClusterType.tez) {
- Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
- // this is necessary temporarily - there's a probem with multi datanodes
on MiniTezCluster
- // will be fixed in 0.3
- numberOfDataNodes = 1;
- }
-
if (clusterType != MiniClusterType.none) {
dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
Modified: hive/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Fri Sep 5 20:16:08 2014
@@ -150,8 +150,8 @@
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
+ <tez.version>0.5.0</tez.version>
<super-csv.version>2.2.0</super-csv.version>
- <tez.version>0.4.1-incubating</tez.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
<wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
Modified: hive/trunk/ql/if/queryplan.thrift
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Fri Sep 5 20:16:08 2014
@@ -56,6 +56,7 @@ enum OperatorType {
PTF,
MUX,
DEMUX,
+ EVENT,
}
struct Operator {
Modified: hive/trunk/ql/pom.xml
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Fri Sep 5 20:16:08 2014
@@ -297,6 +297,38 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
<artifactId>tez-mapreduce</artifactId>
<version>${tez.version}</version>
<optional>true</optional>
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Fri Sep 5
20:16:08 2014
@@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = {
OperatorType::HASHTABLEDUMMY,
OperatorType::PTF,
OperatorType::MUX,
- OperatorType::DEMUX
+ OperatorType::DEMUX,
+ OperatorType::EVENT
};
const char* _kOperatorTypeNames[] = {
"JOIN",
@@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = {
"HASHTABLEDUMMY",
"PTF",
"MUX",
- "DEMUX"
+ "DEMUX",
+ "EVENT"
};
-const std::map<int, const char*>
_OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21,
_kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1,
NULL, NULL));
+const std::map<int, const char*>
_OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22,
_kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1,
NULL, NULL));
int _kTaskTypeValues[] = {
TaskType::MAP,
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Fri Sep 5 20:16:08
2014
@@ -56,7 +56,8 @@ struct OperatorType {
HASHTABLEDUMMY = 17,
PTF = 18,
MUX = 19,
- DEMUX = 20
+ DEMUX = 20,
+ EVENT = 21
};
};
Modified:
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
(original)
+++
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
Fri Sep 5 20:16:08 2014
@@ -32,7 +32,8 @@ public enum OperatorType implements org.
HASHTABLEDUMMY(17),
PTF(18),
MUX(19),
- DEMUX(20);
+ DEMUX(20),
+ EVENT(21);
private final int value;
@@ -95,6 +96,8 @@ public enum OperatorType implements org.
return MUX;
case 20:
return DEMUX;
+ case 21:
+ return EVENT;
default:
return null;
}
Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Fri Sep 5 20:16:08 2014
@@ -56,6 +56,7 @@ final class OperatorType {
const PTF = 18;
const MUX = 19;
const DEMUX = 20;
+ const EVENT = 21;
static public $__names = array(
0 => 'JOIN',
1 => 'MAPJOIN',
@@ -78,6 +79,7 @@ final class OperatorType {
18 => 'PTF',
19 => 'MUX',
20 => 'DEMUX',
+ 21 => 'EVENT',
);
}
Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Fri Sep 5 20:16:08
2014
@@ -66,6 +66,7 @@ class OperatorType:
PTF = 18
MUX = 19
DEMUX = 20
+ EVENT = 21
_VALUES_TO_NAMES = {
0: "JOIN",
@@ -89,6 +90,7 @@ class OperatorType:
18: "PTF",
19: "MUX",
20: "DEMUX",
+ 21: "EVENT",
}
_NAMES_TO_VALUES = {
@@ -113,6 +115,7 @@ class OperatorType:
"PTF": 18,
"MUX": 19,
"DEMUX": 20,
+ "EVENT": 21,
}
class TaskType:
Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Fri Sep 5 20:16:08
2014
@@ -42,8 +42,9 @@ module OperatorType
PTF = 18
MUX = 19
DEMUX = 20
- VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4
=> "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 =>
"TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF",
14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17
=> "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"}
- VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY,
LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF,
LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX,
DEMUX]).freeze
+ EVENT = 21
+ VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4
=> "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 =>
"TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF",
14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17
=> "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT"}
+ VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY,
LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF,
LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX,
DEMUX, EVENT]).freeze
end
module TaskType
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
Fri Sep 5 20:16:08 2014
@@ -104,6 +104,8 @@ public class MapJoinOperator extends Abs
cache = ObjectCacheFactory.getCache(hconf);
loader = HashTableLoaderFactory.getLoader(hconf);
+ hashMapRowGetters = null;
+
mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey);
mapJoinTableSerdes = (MapJoinTableContainerSerDe[])
cache.retrieve(serdeKey);
hashTblInitedOnce = true;
@@ -186,7 +188,7 @@ public class MapJoinOperator extends Abs
* process different buckets and if the container is reused to join a
different bucket,
* join results can be incorrect. The cache is keyed on operator id and
for bucket map join
* the operator does not change but data needed is different. For a
proper fix, this
- * requires changes in the Tez API with regard to finding bucket id and
+ * requires changes in the Tez API with regard to finding bucket id and
* also ability to schedule tasks to re-use containers that have cached
the specific bucket.
*/
LOG.info("This is not bucket map join, so cache");
@@ -237,7 +239,7 @@ public class MapJoinOperator extends Abs
firstRow = false;
}
- alias = (byte)tag;
+ alias = (byte) tag;
if (hashMapRowGetters == null) {
hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
MapJoinKey refKey = getRefKey(alias);
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
Fri Sep 5 20:16:08 2014
@@ -29,13 +29,15 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Un
* OperatorFactory.
*
*/
+@SuppressWarnings({ "rawtypes", "unchecked" })
public final class OperatorFactory {
private static final List<OpTuple> opvec;
private static final List<OpTuple> vectorOpvec;
@@ -101,6 +104,10 @@ public final class OperatorFactory {
DemuxOperator.class));
opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
MuxOperator.class));
+ opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+ AppMasterEventOperator.class));
+ opvec.add(new
OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+ AppMasterEventOperator.class));
}
static {
@@ -119,9 +126,9 @@ public final class OperatorFactory {
private static final class OpTuple<T extends OperatorDesc> {
private final Class<T> descClass;
- private final Class<? extends Operator<T>> opClass;
+ private final Class<? extends Operator<?>> opClass;
- public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+ public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
this.descClass = descClass;
this.opClass = opClass;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri
Sep 5 20:16:08 2014
@@ -820,10 +820,12 @@ public final class Utilities {
}
}
- public static Set<Operator<?>> cloneOperatorTree(Configuration conf,
Set<Operator<?>> roots) {
+ public static List<Operator<?>> cloneOperatorTree(Configuration conf,
List<Operator<?>> roots) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
serializePlan(roots, baos, conf, true);
- Set<Operator<?>> result = deserializePlan(new
ByteArrayInputStream(baos.toByteArray()),
+ @SuppressWarnings("unchecked")
+ List<Operator<?>> result =
+ deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
roots.getClass(), conf, true);
return result;
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
Fri Sep 5 20:16:08 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
- if(!runningViaChild) {
+ if (!runningViaChild) {
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
return super.execute(driverContext);
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
Fri Sep 5 20:16:08 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer
private boolean[] sortableSortOrders;
private KeyValueHelper writeHelper;
- private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+ private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
public MapJoinBytesTableContainer(Configuration hconf,
MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
@@ -476,6 +476,7 @@ public class MapJoinBytesTableContainer
return valueStruct.getFieldsAsList(); // TODO: should we unset bytes
after that?
}
+ @Override
public void addRow(List<Object> t) {
if (dummyRow != null || !refs.isEmpty()) {
throw new RuntimeException("Cannot add rows when not empty");
@@ -484,9 +485,11 @@ public class MapJoinBytesTableContainer
}
// Various unsupported methods.
+ @Override
public void addRow(Object[] value) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot
add arrays");
}
+ @Override
public void write(MapJoinObjectSerDeContext valueContext,
ObjectOutputStream out) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot
be serialized");
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
Fri Sep 5 20:16:08 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
import org.apache.hadoop.io.Writable;
/**
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
Fri Sep 5 20:16:08 2014
@@ -19,61 +19,62 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
private static final Log LOG =
LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
+ final EdgeManagerPluginContext context;
// used by the framework at runtime. initialize is the real initializer at
runtime
- public CustomPartitionEdge() {
+ public CustomPartitionEdge(EdgeManagerPluginContext context) {
+ super(context);
+ this.context = context;
}
+
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
- int destinationTaskIndex) {
- return numSourceTasks;
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+ return context.getSourceVertexNumTasks();
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
- int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
return conf.getNumBuckets();
}
@Override
- public int getNumDestinationConsumerTasks(int sourceTaskIndex, int
numDestinationTasks) {
- return numDestinationTasks;
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return context.getDestinationVertexNumTasks();
}
// called at runtime to initialize the custom edge.
@Override
- public void initialize(EdgeManagerContext context) {
- byte[] payload = context.getUserPayload();
+ public void initialize() {
+ ByteBuffer payload = context.getUserPayload().getPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
throw new RuntimeException("Invalid payload");
}
// De-serialization code
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(payload, payload.length);
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(payload);
conf = new CustomEdgeConfiguration();
try {
- conf.readFields(dib);
+ conf.readFields(dibb);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -83,30 +84,25 @@ public class CustomPartitionEdge impleme
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, Map<Integer,
List<Integer>> mapDestTaskIndices) {
- int srcIndex = event.getSourceIndex();
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>>
mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+ mapDestTaskIndices.put(destIndex, outputIndices);
+ }
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int
sourceTaskIndex,
- int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices)
{
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+ mapDestTaskIndices.put(i, outputIndices);
+ }
}
@Override
- public int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex) {
return event.getIndex();
}
-
- void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer>
taskIndices) {
- for(int i=0; i<numDestinationTasks; ++i) {
- taskIndices.add(new Integer(i));
- }
- }
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
Fri Sep 5 20:16:08 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,27 +31,30 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
@@ -59,40 +63,44 @@ import com.google.common.collect.HashMul
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
/*
* Only works with old mapred API
* Will only work with a single MRInput for now.
*/
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
private static final Log LOG =
LogFactory.getLog(CustomPartitionVertex.class.getName());
VertexManagerPluginContext context;
- private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
- private List<RootInputDataInformationEvent> dataInformationEvents;
+ private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+ private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
private boolean rootVertexInitialized = false;
private final SplitGrouper grouper = new SplitGrouper();
+ private int taskCount = 0;
- public CustomPartitionVertex() {
+ public CustomPartitionVertex(VertexManagerPluginContext context) {
+ super(context);
}
@Override
- public void initialize(VertexManagerPluginContext context) {
- this.context = context;
- ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+ public void initialize() {
+ this.context = getContext();
+ ByteBuffer byteBuf = context.getUserPayload().getPayload();
this.numBuckets = byteBuf.getInt();
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+ List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
+ new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
for (int i = 0; i < numTasks; ++i) {
- scheduledTasks.add(new Integer(i));
+ scheduledTasks.add(new
VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
}
context.scheduleVertexTasks(scheduledTasks);
}
@@ -114,6 +122,7 @@ public class CustomPartitionVertex imple
// ensure this method is called only once. Tez will call it once per Root
// Input.
Preconditions.checkState(rootVertexInitialized == false);
+ LOG.info("Root vertex not initialized");
rootVertexInitialized = true;
try {
// This is using the payload from the RootVertexInitializer corresponding
@@ -121,8 +130,8 @@ public class CustomPartitionVertex imple
// but that
// means serializing another instance.
MRInputUserPayloadProto protoPayload =
- MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
- this.conf =
MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+ MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+ this.conf =
TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
/*
* Currently in tez, the flow of events is thus:
@@ -138,30 +147,27 @@ public class CustomPartitionVertex imple
*/
// This assumes that Grouping will always be used.
- // Changing the InputFormat - so that the correct one is initialized in
- // MRInput.
- this.conf.set("mapred.input.format.class",
TezGroupedSplitsInputFormat.class.getName());
+ // Enabling grouping on the payload.
MRInputUserPayloadProto updatedPayload =
- MRInputUserPayloadProto.newBuilder(protoPayload)
-
.setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
- inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+
MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+
inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
boolean dataInformationEventSeen = false;
- Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path,
List<FileSplit>>();
+ Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String,
List<FileSplit>>();
for (Event event : events) {
- if (event instanceof RootInputConfigureVertexTasksEvent) {
+ if (event instanceof InputConfigureVertexTasksEvent) {
// No tasks should have been started yet. Checked by initial state
// check.
Preconditions.checkState(dataInformationEventSeen == false);
Preconditions
.checkState(context.getVertexNumTasks(context.getVertexName()) ==
-1,
"Parallelism for the vertex should be set to -1 if the
InputInitializer is setting parallelism");
- RootInputConfigureVertexTasksEvent cEvent =
(RootInputConfigureVertexTasksEvent) event;
+ InputConfigureVertexTasksEvent cEvent =
(InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
// build the routing table.
@@ -169,12 +175,12 @@ public class CustomPartitionVertex imple
dataInformationEvents =
Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
}
- if (event instanceof RootInputUpdatePayloadEvent) {
+ if (event instanceof InputUpdatePayloadEvent) {
// this event can never occur. If it does, fail.
Preconditions.checkState(false);
- } else if (event instanceof RootInputDataInformationEvent) {
+ } else if (event instanceof InputDataInformationEvent) {
dataInformationEventSeen = true;
- RootInputDataInformationEvent diEvent =
(RootInputDataInformationEvent) event;
+ InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
dataInformationEvents.add(diEvent);
FileSplit fileSplit;
try {
@@ -182,10 +188,10 @@ public class CustomPartitionVertex imple
} catch (IOException e) {
throw new RuntimeException("Failed to get file split for event: " +
diEvent);
}
- List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+ List<FileSplit> fsList =
pathFileSplitsMap.get(fileSplit.getPath().getName());
if (fsList == null) {
fsList = new ArrayList<FileSplit>();
- pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+ pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
}
fsList.add(fileSplit);
}
@@ -195,21 +201,32 @@ public class CustomPartitionVertex imple
getBucketSplitMapForPath(pathFileSplitsMap);
try {
- int totalResource = context.getTotalAVailableResource().getMemory();
+ int totalResource = context.getTotalAvailableResource().getMemory();
int taskResource = context.getVertexTaskResource().getMemory();
float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
int availableSlots = totalResource / taskResource;
LOG.info("Grouping splits. " + availableSlots + " available slots, " +
waves + " waves.");
+ JobConf jobConf = new JobConf(conf);
+ ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
- grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+ HashMultimap.<Integer, InputSplit> create();
+ for (Integer key : bucketToInitialSplitMap.keySet()) {
+ InputSplit[] inputSplitArray =
+ (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+ Multimap<Integer, InputSplit> groupedSplit =
+ HiveSplitGenerator.generateGroupedSplits(jobConf, conf,
inputSplitArray, waves,
+ availableSlots);
+ bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+ }
+ LOG.info("We have grouped the splits into " +
bucketToGroupedSplitMap.size() + " tasks");
processAllEvents(inputName, bucketToGroupedSplitMap);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -219,7 +236,6 @@ public class CustomPartitionVertex imple
Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer,
Integer> create();
List<InputSplit> finalSplits = Lists.newLinkedList();
- int taskCount = 0;
for (Entry<Integer, Collection<InputSplit>> entry :
bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
Collection<InputSplit> initialSplits = entry.getValue();
@@ -232,12 +248,12 @@ public class CustomPartitionVertex imple
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
- EdgeManagerDescriptor hiveEdgeManagerDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
- byte[] payload = getBytePayload(bucketToTaskMap);
+ EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+
EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+ UserPayload payload = getBytePayload(bucketToTaskMap);
hiveEdgeManagerDesc.setUserPayload(payload);
- Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+ Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type
custom.
for (Entry<String, EdgeProperty> edgeEntry :
context.getInputVertexEdgeProperties().entrySet()) {
@@ -250,47 +266,51 @@ public class CustomPartitionVertex imple
LOG.info("Task count is " + taskCount);
- List<RootInputDataInformationEvent> taskEvents =
+ List<InputDataInformationEvent> taskEvents =
Lists.newArrayListWithCapacity(finalSplits.size());
// Re-serialize the splits after grouping.
int count = 0;
for (InputSplit inputSplit : finalSplits) {
- MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count,
serializedSplit.toByteArray());
+ MRSplitProto serializedSplit =
MRInputHelpers.createSplitProto(inputSplit);
+ InputDataInformationEvent diEvent =
InputDataInformationEvent.createWithSerializedPayload(
+ count, serializedSplit.toByteString().asReadOnlyByteBuffer());
diEvent.setTargetIndex(count);
count++;
taskEvents.add(diEvent);
}
// Replace the Edge Managers
+ Map<String, InputSpecUpdate> rootInputSpecUpdate =
+ new HashMap<String, InputSpecUpdate>();
+ rootInputSpecUpdate.put(
+ inputName,
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
context.setVertexParallelism(
taskCount,
- new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap);
+ VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+ .toArray(new InputSplit[finalSplits.size()]))), emMap,
rootInputSpecUpdate);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
}
- private byte[] getBytePayload(Multimap<Integer, Integer> routingTable)
throws IOException {
+ UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws
IOException {
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(routingTable.keySet().size(),
routingTable);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] serialized = dob.getData();
-
- return serialized;
+ return UserPayload.create(ByteBuffer.wrap(serialized));
}
- private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event)
throws IOException {
+ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event)
throws IOException {
InputSplit inputSplit = null;
if (event.getDeserializedUserPayload() != null) {
inputSplit = (InputSplit) event.getDeserializedUserPayload();
} else {
- MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+ MRSplitProto splitProto =
MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
SerializationFactory serializationFactory = new SerializationFactory(new
Configuration());
- inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
serializationFactory);
+ inputSplit =
MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto,
serializationFactory);
}
if (!(inputSplit instanceof FileSplit)) {
@@ -304,7 +324,7 @@ public class CustomPartitionVertex imple
* This method generates the map of bucket to file splits.
*/
private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
- Map<Path, List<FileSplit>> pathFileSplitsMap) {
+ Map<String, List<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
int fsCount = 0;
@@ -312,7 +332,7 @@ public class CustomPartitionVertex imple
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
- for (Map.Entry<Path, List<FileSplit>> entry :
pathFileSplitsMap.entrySet()) {
+ for (Map.Entry<String, List<FileSplit>> entry :
pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
fsCount++;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri
Sep 5 20:16:08 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -80,42 +79,43 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import
org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import
org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -163,6 +163,7 @@ public class DagUtils {
JobConf conf = new JobConf(baseConf);
if (mapWork.getNumMapTasks() != null) {
+ // Is this required ?
conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
}
@@ -219,20 +220,19 @@ public class DagUtils {
* Edge between them.
*
* @param group The parent VertexGroup
- * @param wConf The job conf of the child vertex
+ * @param vConf The job conf of one of the parrent (grouped) vertices
* @param w The child vertex
* @param edgeProp the edge property of connection between the two
* endpoints.
*/
@SuppressWarnings("rawtypes")
- public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
Vertex w, TezEdgeProperty edgeProp)
throws IOException {
Class mergeInputClass;
- LOG.info("Creating Edge between " + group.getGroupName() + " and " +
w.getVertexName());
-
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " +
w.getName());
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
@@ -243,9 +243,10 @@ public class DagUtils {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
VertexManagerPluginDescriptor desc =
- new
VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- desc.setUserPayload(userPayload);
+
VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -263,47 +264,31 @@ public class DagUtils {
break;
}
- return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
- new InputDescriptor(mergeInputClass.getName()));
+ return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+ InputDescriptor.create(mergeInputClass.getName()));
}
/**
- * Given two vertices a, b update their configurations to be used in an Edge
a-b
- */
- public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf
wConf, Vertex w)
- throws IOException {
-
- // Tez needs to setup output subsequent input pairs correctly
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
-
- // update payloads (configuration for the vertices might have changed)
-
v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
-
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
- }
-
- /**
- * Given two vertices and their respective configuration objects createEdge
+ * Given two vertices and the configuration for the source vertex, createEdge
* will create an Edge object that connects the two.
*
- * @param vConf JobConf of the first vertex
+ * @param vConf JobConf of the first (source) vertex
* @param v The first vertex (source)
- * @param wConf JobConf of the second vertex
* @param w The second vertex (sink)
* @return
*/
- public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+ public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
TezEdgeProperty edgeProp)
throws IOException {
- updateConfigurationForEdge(vConf, v, wConf, w);
-
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(
CustomPartitionVertex.class.getName());
- desc.setUserPayload(userPayload);
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -315,71 +300,92 @@ public class DagUtils {
// nothing
}
- return new Edge(v, w, createEdgeProperty(edgeProp));
+ return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
}
/*
* Helper function to create an edge property from an edge type.
*/
@SuppressWarnings("rawtypes")
- private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws
IOException {
- DataMovementType dataMovementType;
- Class logicalInputClass;
- Class logicalOutputClass;
+ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp,
Configuration conf)
+ throws IOException {
+ MRHelpers.translateMRConfToTez(conf);
+ String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ String valClass =
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ String partitionerClassName = conf.get("mapred.partitioner.class");
+ Map<String, String> partitionerConf;
- EdgeProperty edgeProperty = null;
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
case BROADCAST_EDGE:
- dataMovementType = DataMovementType.BROADCAST;
- logicalOutputClass = OnFileUnorderedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
+ .newBuilder(keyClass, valClass)
+ .setFromConfiguration(conf)
+
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et1Conf.createDefaultBroadcastEdgeProperty();
case CUSTOM_EDGE:
- dataMovementType = DataMovementType.CUSTOM;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- EdgeManagerDescriptor edgeDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfig et2Conf =
UnorderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(),
partitionerConf)
+ .setFromConfiguration(conf)
+
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ EdgeManagerPluginDescriptor edgeDesc =
+
EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] userPayload = dob.getData();
- edgeDesc.setUserPayload(userPayload);
- edgeProperty =
- new EdgeProperty(edgeDesc,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
- break;
-
+
edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+ return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
case CUSTOM_SIMPLE_EDGE:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfig et3Conf =
UnorderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(),
partitionerConf)
+ .setFromConfiguration(conf)
+
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et3Conf.createDefaultEdgeProperty();
case SIMPLE_EDGE:
default:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileSortedOutput.class;
- logicalInputClass = ShuffledMergedInputLegacy.class;
- break;
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(),
partitionerConf)
+ .setFromConfiguration(conf)
+
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
+ TezBytesComparator.class.getName(), null)
+
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et4Conf.createDefaultEdgeProperty();
}
+ }
- if (edgeProperty == null) {
- edgeProperty =
- new EdgeProperty(dataMovementType,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
+ /**
+ * Utility method to create a stripped down configuration for the MR
partitioner.
+ *
+ * @param partitionerClassName
+ * the real MR partitioner class name
+ * @param baseConf
+ * a base configuration to extract relevant properties
+ * @return
+ */
+ private Map<String, String> createPartitionerConf(String
partitionerClassName,
+ Configuration baseConf) {
+ Map<String, String> partitionerConf = new HashMap<String, String>();
+ partitionerConf.put("mapred.partitioner.class", partitionerClassName);
+ if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
+ partitionerConf.put("mapreduce.totalorderpartitioner.path",
+ baseConf.get("mapreduce.totalorderpartitioner.path"));
}
-
- return edgeProperty;
+ return partitionerConf;
}
/*
@@ -397,6 +403,15 @@ public class DagUtils {
}
/*
+ * Helper to setup default environment for a task in YARN.
+ */
+ private Map<String, String> getContainerEnvironment(Configuration conf,
boolean isMap) {
+ Map<String, String> environment = new HashMap<String, String>();
+ MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
+ return environment;
+ }
+
+ /*
* Helper to determine what java options to use for the containers
* Falls back to Map-reduces map java opts if no tez specific options
* are set
@@ -406,14 +421,14 @@ public class DagUtils {
if (javaOpts != null && !javaOpts.isEmpty()) {
String logLevel = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVETEZLOGLEVEL);
List<String> logProps = Lists.newArrayList();
- MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+ TezUtils.addLog4jSystemProperties(logLevel, logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
sb.append(str).append(" ");
}
return javaOpts + " " + sb.toString();
}
- return MRHelpers.getMapJavaOpts(conf);
+ return MRHelpers.getJavaOptsForMRMapper(conf);
}
/*
@@ -431,18 +446,15 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, mapWork);
- // Tez ask us to call this even if there's no preceding vertex
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// finally create the vertex
Vertex map = null;
// use tez to combine splits
- boolean useTezGroupedSplits = false;
+ boolean groupSplitsInInputInitializer;
+
+ DataSourceDescriptor dataSource;
int numTasks = -1;
- Class<HiveSplitGenerator> amSplitGeneratorClass = null;
- InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -456,9 +468,9 @@ public class DagUtils {
}
if (vertexHasCustomInput) {
- useTezGroupedSplits = false;
- // grouping happens in execution phase. Setting the class to
TezGroupedSplitsInputFormat
- // here would cause pre-mature grouping which would be incorrect.
+ groupSplitsInInputInitializer = false;
+ // grouping happens in execution phase. The input payload should not
enable grouping here,
+ // it will be enabled in the CustomVertex.
inputFormatClass = HiveInputFormat.class;
conf.setClass("mapred.input.format.class", HiveInputFormat.class,
InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set
to false when using
@@ -468,49 +480,52 @@ public class DagUtils {
// we'll set up tez to combine spits for us iff the input format
// is HiveInputFormat
if (inputFormatClass == HiveInputFormat.class) {
- useTezGroupedSplits = true;
- conf.setClass("mapred.input.format.class",
TezGroupedSplitsInputFormat.class, InputFormat.class);
+ groupSplitsInInputInitializer = true;
+ } else {
+ groupSplitsInInputInitializer = false;
}
}
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
+
+ // set up the operator plan. (before setting up splits on the AM)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
- amSplitGeneratorClass = HiveSplitGenerator.class;
+ if (groupSplitsInInputInitializer) {
+ // Not setting a payload, since the MRInput payload is the same and
can be accessed.
+ InputInitializerDescriptor descriptor =
InputInitializerDescriptor.create(
+ HiveSplitGenerator.class.getName());
+ dataSource = MRInputLegacy.createConfigBuilder(conf,
inputFormatClass).groupSplits(true)
+ .setCustomInitializerDescriptor(descriptor).build();
+ } else {
+ // Not HiveInputFormat, or a custom VertexManager will take care of
grouping splits
+ dataSource = MRInputLegacy.createConfigBuilder(conf,
inputFormatClass).groupSplits(false).build();
+ }
} else {
- // client side split generation means we have to compute them now
- inputSplitInfo = MRHelpers.generateInputSplits(conf,
- new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
- numTasks = inputSplitInfo.getNumTasks();
+ // Setup client side split generation.
+ dataSource =
MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+ "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+ numTasks = dataSource.getNumberOfShards();
+
+ // set up the operator plan. (after generating splits - that changes
configs)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
}
- // set up the operator plan
- Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
- byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
- map = new Vertex(mapWork.getName(),
- new ProcessorDescriptor(MapTezProcessor.class.getName()).
+ UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+ map = Vertex.create(mapWork.getName(),
+ ProcessorDescriptor.create(MapTezProcessor.class.getName()).
setUserPayload(serializedConf), numTasks, getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- map.setTaskEnvironment(environment);
- map.setJavaOpts(getContainerJavaOpts(conf));
+ map.setTaskEnvironment(getContainerEnvironment(conf, true));
+ map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
+ // Add the actual source input
String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
- byte[] mrInput = null;
- if (useTezGroupedSplits) {
- mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
- HiveInputFormat.class.getName());
- } else {
- mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
- }
- map.addInput(alias,
- new InputDescriptor(MRInputLegacy.class.getName()).
- setUserPayload(mrInput), amSplitGeneratorClass);
+ map.addDataSource(alias, dataSource);
Map<String, LocalResource> localResources = new HashMap<String,
LocalResource>();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -518,14 +533,7 @@ public class DagUtils {
localResources.put(getBaseName(lr), lr);
}
- if (inputSplitInfo != null) {
- // only relevant for client-side split generation
- map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
- MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf),
inputSplitInfo,
- localResources);
- }
-
- map.setTaskLocalResources(localResources);
+ map.addTaskLocalFiles(localResources);
return map;
}
@@ -535,6 +543,7 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context,
ReduceWork reduceWork) {
JobConf conf = new JobConf(baseConf);
+ // Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
@@ -558,29 +567,22 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
- // Call once here, will be updated when we find edges
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// create the vertex
- Vertex reducer = new Vertex(reduceWork.getName(),
- new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
- setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ Vertex reducer = Vertex.create(reduceWork.getName(),
+ ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+ setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
reduceWork.isAutoReduceParallelism() ?
reduceWork.getMaxReduceTasks() : reduceWork
.getNumReduceTasks(), getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
-
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
- reducer.setTaskEnvironment(environment);
-
- reducer.setJavaOpts(getContainerJavaOpts(conf));
+ reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+ reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map<String, LocalResource> localResources = new HashMap<String,
LocalResource>();
localResources.put(getBaseName(appJarLr), appJarLr);
for (LocalResource lr: additionalLr) {
localResources.put(getBaseName(lr), lr);
}
- reducer.setTaskLocalResources(localResources);
+ reducer.addTaskLocalFiles(localResources);
return reducer;
}
@@ -614,37 +616,29 @@ public class DagUtils {
}
/**
- * @param sessionConfig session configuration
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
- * @return prewarm context object
+ * @return prewarm vertex to run
*/
- public PreWarmContext createPreWarmContext(TezSessionConfiguration
sessionConfig, int numContainers,
- Map<String, LocalResource> localResources) throws IOException,
TezException {
-
- Configuration conf = sessionConfig.getTezConfiguration();
+ public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
+ int numContainers, Map<String, LocalResource> localResources) throws
+ IOException, TezException {
- ProcessorDescriptor prewarmProcDescriptor = new
ProcessorDescriptor(HivePreWarmProcessor.class.getName());
-
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
+ ProcessorDescriptor prewarmProcDescriptor =
ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+
prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- PreWarmContext context = new PreWarmContext(prewarmProcDescriptor,
getContainerResource(conf),
- numContainers, null);
+ PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm",
prewarmProcDescriptor, numContainers,getContainerResource(conf));
Map<String, LocalResource> combinedResources = new HashMap<String,
LocalResource>();
- combinedResources.putAll(sessionConfig.getSessionResources());
if (localResources != null) {
combinedResources.putAll(localResources);
}
- context.setLocalResources(combinedResources);
-
- /* boiler plate task env */
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- context.setEnvironment(environment);
- context.setJavaOpts(getContainerJavaOpts(conf));
- return context;
+ prewarmVertex.addTaskLocalFiles(localResources);
+ prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+ prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
+ return prewarmVertex;
}
/**
@@ -878,7 +872,7 @@ public class DagUtils {
public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
- JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+ JobConf conf = new JobConf(hiveConf);
conf.set("mapred.output.committer.class",
NullOutputCommitter.class.getName());
@@ -967,9 +961,9 @@ public class DagUtils {
// final vertices need to have at least one output
if (!hasChildren) {
- v.addOutput("out_"+work.getName(),
- new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+ OutputDescriptor.create(MROutput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null,
null));
}
return v;
@@ -1037,16 +1031,16 @@ public class DagUtils {
if (edgeProp.isAutoReduce()) {
Configuration pluginConf = new Configuration(false);
VertexManagerPluginDescriptor desc =
- new
VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+
VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
pluginConf.setBoolean(
-
ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-
pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+
pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
edgeProp.getMinReducer());
pluginConf.setLong(
-
ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
edgeProp.getInputSizePerReducer());
- ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
- desc.setUserPayload(payload.toByteArray());
+ UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+ desc.setUserPayload(payload);
v.setVertexManagerPlugin(desc);
}
}