HIVE-19829: Incremental replication load should create tasks in execution phase rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/150ef3ba Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/150ef3ba Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/150ef3ba Branch: refs/heads/branch-3 Commit: 150ef3ba55edfcac26eeaa0574e95a2498096798 Parents: c191ea5 Author: Sankar Hariappan <[email protected]> Authored: Sat Jul 28 22:35:43 2018 +0530 Committer: Sankar Hariappan <[email protected]> Committed: Sat Jul 28 22:35:43 2018 +0530 ---------------------------------------------------------------------- .../TestReplicationScenariosAcidTables.java | 12 +- ...TestReplicationScenariosAcrossInstances.java | 77 ++++- ql/if/queryplan.thrift | 3 +- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +- ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +- .../hadoop/hive/ql/plan/api/StageType.java | 5 +- ql/src/gen/thrift/gen-php/Types.php | 2 + ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 3 + ql/src/gen/thrift/gen-rb/queryplan_types.rb | 5 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 +- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 344 +++++++++++++++++++ .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 113 ++++++ .../hadoop/hive/ql/exec/repl/ReplUtils.java | 122 ------- .../repl/bootstrap/AddDependencyToLeaves.java | 51 --- .../ql/exec/repl/bootstrap/ReplLoadTask.java | 319 ----------------- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 88 ----- .../filesystem/BootstrapEventsIterator.java | 9 + .../repl/bootstrap/load/LoadConstraint.java | 1 + .../exec/repl/bootstrap/load/LoadDatabase.java | 5 +- .../exec/repl/bootstrap/load/LoadFunction.java | 3 +- .../exec/repl/bootstrap/load/TaskTracker.java | 135 -------- .../bootstrap/load/table/LoadPartitions.java | 6 +- .../repl/bootstrap/load/table/LoadTable.java | 6 +- .../repl/bootstrap/load/table/TableContext.java | 2 +- .../IncrementalLoadEventsIterator.java | 73 ++++ .../IncrementalLoadTasksBuilder.java | 320 +++++++++++++++++ .../exec/repl/util/AddDependencyToLeaves.java | 51 +++ .../hive/ql/exec/repl/util/ReplUtils.java | 123 +++++++ .../hive/ql/exec/repl/util/TaskTracker.java | 145 ++++++++ .../ql/optimizer/QueryPlanPostProcessor.java | 2 +- .../apache/hadoop/hive/ql/parse/EximUtil.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 295 +--------------- .../parse/repl/dump/io/PartitionSerializer.java | 2 +- .../ql/parse/repl/dump/io/TableSerializer.java | 2 +- .../repl/load/message/AlterDatabaseHandler.java | 2 +- .../repl/load/message/CommitTxnHandler.java | 2 +- .../repl/load/message/DropPartitionHandler.java | 2 +- .../bootstrap/AddDependencyToLeavesTest.java | 1 + .../repl/bootstrap/load/TestTaskTracker.java | 1 + 39 files changed, 1291 insertions(+), 1058 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index f1c0100..b523d9f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -444,7 +444,7 @@ public class TestReplicationScenariosAcidTables { WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); - replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation) + replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'") .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); } @@ -494,10 +494,8 @@ public class TestReplicationScenariosAcidTables { replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) .verifyResult("null") - .run("show tables") - .verifyResults(new String[] { "t1" }) - .run("select id from t1") - .verifyResults(Arrays.asList("1")); + .run("show tables like t2") + .verifyResults(new String[] { }); // Retry with different dump should fail. replica.loadFailure(replicatedDbName, tuple2.dumpLocation); @@ -512,10 +510,6 @@ public class TestReplicationScenariosAcidTables { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); return false; } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); - return args.tblName.equals("t2"); - } return true; } }; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 7c043b9..b2e1e25 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; @@ -45,6 +45,8 @@ import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.junit.Assert; import java.io.IOException; import java.net.URI; @@ -80,10 +82,11 @@ public class TestReplicationScenariosAcrossInstances { protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica; private String primaryDbName, replicatedDbName; + private static HiveConf conf; @BeforeClass public static void classLevelSetup() throws Exception { - Configuration conf = new Configuration(); + conf = new HiveConf(TestReplicationScenarios.class); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = @@ -874,6 +877,56 @@ public class TestReplicationScenariosAcrossInstances { } @Test + public void testIncrementalDumpMultiIteration() throws Throwable { + WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); + + WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName) + .run("create table table1 (id int) partitioned by (country string)") + .run("create table table2 (id int)") + .run("create table table3 (id int) partitioned by (country string)") + .run("insert into table1 partition(country='india') values(1)") + .run("insert into table2 values(2)") + .run("insert into table3 partition(country='india') values(3)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'")) + .status(replicatedDbName) + .verifyResult(incremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] {"1" }) + .run("select * from table2") + .verifyResults(new String[] {"2" }) + .run("select id from table3") + .verifyResults(new String[] {"3" }); + assert(IncrementalLoadTasksBuilder.getNumIteration() > 1); + + incremental = primary.run("use " + primaryDbName) + .run("create table table5 (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc") + .run("create table table4 (i int, j int)") + .run("insert into table4 values (1,2)") + .dump(primaryDbName, incremental.lastReplicationId); + + Path path = new Path(incremental.dumpLocation); + FileSystem fs = path.getFileSystem(conf); + FileStatus[] fileStatus = fs.listStatus(path); + int numEvents = fileStatus.length - 1; //one is metadata file + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'")) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" }) + .run("select i from table4") + .verifyResult("1"); + Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents); + } + + @Test public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary .run("use " + primaryDbName) @@ -1088,9 +1141,7 @@ public class TestReplicationScenariosAcrossInstances { replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) - .verifyResult("null") - .run("show tables") - .verifyResults(new String[] { "t1" }); + .verifyResult("null"); assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); @@ -1112,10 +1163,6 @@ public class TestReplicationScenariosAcrossInstances { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName)); return false; } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); - return (args.tblName.equals("t2") || args.tblName.equals("t3")); - } if (args.constraintTblName != null) { LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3")); @@ -1188,8 +1235,6 @@ public class TestReplicationScenariosAcrossInstances { public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) - .run("create table t1 (id int)") - .run("insert into table t1 values (10)") .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='uk') values ('london')") @@ -1204,7 +1249,7 @@ public class TestReplicationScenariosAcrossInstances { .dump(primaryDbName, null); // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". - // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed. + // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed. BehaviourInjection<Partition, Partition> getPartitionStub = new BehaviourInjection<Partition, Partition>() { @Nullable @@ -1229,9 +1274,7 @@ public class TestReplicationScenariosAcrossInstances { .run("repl status " + replicatedDbName) .verifyResult("null") .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select id from t1") - .verifyResults(Arrays.asList("10")) + .verifyResults(new String[] {"t2" }) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india")) .run("show functions like '" + replicatedDbName + "*'") @@ -1271,9 +1314,7 @@ public class TestReplicationScenariosAcrossInstances { .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select id from t1") - .verifyResults(Arrays.asList("10")) + .verifyResults(new String[] { "t2" }) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india", "uk", "us")) .run("show functions like '" + replicatedDbName + "*'") http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/if/queryplan.thrift ---------------------------------------------------------------------- diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index d0ba704..697b5f5 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -104,7 +104,8 @@ enum StageType { REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, - REPL_TXN + REPL_TXN, + REPL_INCREMENTAL_LOAD } struct Stage { http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index e28ac4c..fd04675 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -121,7 +121,8 @@ int _kStageTypeValues[] = { StageType::REPL_DUMP, StageType::REPL_BOOTSTRAP_LOAD, StageType::REPL_STATE_LOG, - StageType::REPL_TXN + StageType::REPL_TXN, + StageType::REPL_INCREMENTAL_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -139,9 +140,10 @@ const char* _kStageTypeNames[] = { "REPL_DUMP", "REPL_BOOTSTRAP_LOAD", "REPL_STATE_LOG", - "REPL_TXN" + "REPL_TXN", + "REPL_INCREMENTAL_LOAD" }; -const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-cpp/queryplan_types.h ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index 6bdea4b..8bd11c0 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -98,7 +98,8 @@ struct StageType { REPL_DUMP = 12, REPL_BOOTSTRAP_LOAD = 13, REPL_STATE_LOG = 14, - REPL_TXN = 15 + REPL_TXN = 15, + REPL_INCREMENTAL_LOAD = 16 }; }; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 08822b3..7eebe28 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -27,7 +27,8 @@ public enum StageType implements org.apache.thrift.TEnum { REPL_DUMP(12), REPL_BOOTSTRAP_LOAD(13), REPL_STATE_LOG(14), - REPL_TXN(15); + REPL_TXN(15), + REPL_INCREMENTAL_LOAD(16); private final int value; @@ -80,6 +81,8 @@ public enum StageType implements org.apache.thrift.TEnum { return REPL_STATE_LOG; case 15: return REPL_TXN; + case 16: + return REPL_INCREMENTAL_LOAD; default: return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-php/Types.php ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index 4ceec88..937dad2 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -120,6 +120,7 @@ final class StageType { const REPL_BOOTSTRAP_LOAD = 13; const REPL_STATE_LOG = 14; const REPL_TXN = 15; + const REPL_INCREMENTAL_LOAD = 16; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -137,6 +138,7 @@ final class StageType { 13 => 'REPL_BOOTSTRAP_LOAD', 14 => 'REPL_STATE_LOG', 15 => 'REPL_TXN', + 16 => 'REPL_INCREMENTAL_LOAD', ); } http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-py/queryplan/ttypes.py ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 5638d35..f61f27b 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -167,6 +167,7 @@ class StageType: REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 + REPL_INCREMENTAL_LOAD = 16 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -185,6 +186,7 @@ class StageType: 13: "REPL_BOOTSTRAP_LOAD", 14: "REPL_STATE_LOG", 15: "REPL_TXN", + 16: "REPL_INCREMENTAL_LOAD", } _NAMES_TO_VALUES = { @@ -204,6 +206,7 @@ class StageType: "REPL_BOOTSTRAP_LOAD": 13, "REPL_STATE_LOG": 14, "REPL_TXN": 15, + "REPL_INCREMENTAL_LOAD": 16, } http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-rb/queryplan_types.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 04af975..2867df3 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -77,8 +77,9 @@ module StageType REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze + REPL_INCREMENTAL_LOAD = 16 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze end class Adjacency http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 3a107b7..47a802f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java new file mode 100644 index 0000000..d6f41fa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.ErrorMsg; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; + +public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { + private final static int ZERO_TASKS = 0; + + @Override + public String getName() { + return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD"); + } + + /** + * Provides the root Tasks created as a result of this loadTask run which will be executed + * by the driver. It does not track details across multiple runs of LoadTask. + */ + private static class Scope { + boolean database = false, table = false, partition = false; + List<Task<? extends Serializable>> rootTasks = new ArrayList<>(); + } + + @Override + protected int execute(DriverContext driverContext) { + if (work.isIncrementalLoad()) { + return executeIncrementalLoad(driverContext); + } else { + return executeBootStrapLoad(driverContext); + } + } + + private int executeBootStrapLoad(DriverContext driverContext) { + try { + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + Context context = new Context(work.dumpDirectory, conf, getHive(), + work.sessionStateLineageState, driverContext.getCtx()); + TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + /* + for now for simplicity we are doing just one directory ( one database ), come back to use + of multiple databases once we have the basic flow to chain creating of tasks in place for + a database ( directory ) + */ + BootstrapEventsIterator iterator = work.iterator(); + ConstraintEventsIterator constraintIterator = work.constraintIterator(); + /* + This is used to get hold of a reference during the current creation of tasks and is initialized + with "0" tasks such that it will be non consequential in any operations done with task tracker + compositions. + */ + TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); + TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); + Scope scope = new Scope(); + boolean loadingConstraint = false; + if (!iterator.hasNext() && constraintIterator.hasNext()) { + loadingConstraint = true; + } + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next; + if (!loadingConstraint) { + next = iterator.next(); + } else { + next = constraintIterator.next(); + } + switch (next.eventType()) { + case Database: + DatabaseEvent dbEvent = (DatabaseEvent) next; + dbTracker = + new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker) + .tasks(); + loadTaskTracker.update(dbTracker); + if (work.hasDbState()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } + work.updateDbEventState(dbEvent.toState()); + if (dbTracker.hasTasks()) { + scope.rootTasks.addAll(dbTracker.tasks()); + scope.database = true; + } + dbTracker.debugLog("database"); + break; + case Table: { + /* + Implicit assumption here is that database level is processed first before table level, + which will depend on the iterator used since it should provide the higher level directory + listing before providing the lower level listing. This is also required such that + the dbTracker / tableTracker are setup correctly always. + */ + TableContext tableContext = + new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); + TableEvent tableEvent = (TableEvent) next; + LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), + tableContext, loadTaskTracker); + tableTracker = loadTable.tasks(); + setUpDependencies(dbTracker, tableTracker); + if (!scope.database && tableTracker.hasTasks()) { + scope.rootTasks.addAll(tableTracker.tasks()); + scope.table = true; + } + /* + for table replication if we reach the max number of tasks then for the next run we will + try to reload the same table again, this is mainly for ease of understanding the code + as then we can avoid handling == > loading partitions for the table given that + the creation of table lead to reaching max tasks vs, loading next table since current + one does not have partitions. + */ + + // for a table we explicitly try to load partitions as there is no separate partitions events. + LoadPartitions loadPartitions = + new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent, + work.dbNameToLoadIn, tableContext); + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + tableTracker.debugLog("table"); + partitionsTracker.debugLog("partitions for table"); + break; + } + case Partition: { + /* + This will happen only when loading tables and we reach the limit of number of tasks we can create; + hence we know here that the table should exist and there should be a lastPartitionName + */ + PartitionEvent event = (PartitionEvent) next; + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, + work.tableNameToLoadIn); + LoadPartitions loadPartitions = + new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); + /* + the tableTracker here should be a new instance and not an existing one as this can + only happen when we break in between loading partitions. + */ + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + partitionsTracker.debugLog("partitions"); + break; + } + case Function: { + LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(), + (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker functionsTracker = loadFunction.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(functionsTracker.tasks()); + } else { + setUpDependencies(dbTracker, functionsTracker); + } + loadTaskTracker.update(functionsTracker); + functionsTracker.debugLog("functions"); + break; + } + case Constraint: { + LoadConstraint loadConstraint = + new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker constraintTracker = loadConstraint.tasks(); + scope.rootTasks.addAll(constraintTracker.tasks()); + loadTaskTracker.update(constraintTracker); + constraintTracker.debugLog("constraints"); + } + } + + if (!loadingConstraint && !iterator.currentDbHasNext()) { + createEndReplLogTask(context, scope, iterator.replLogger()); + } + } + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() + || constraintIterator.hasNext(); + if (addAnotherLoadTask) { + createBuilderTask(scope.rootTasks); + } + if (!iterator.hasNext() && !constraintIterator.hasNext()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + work.updateDbEventState(null); + } + this.childTasks = scope.rootTasks; + /* + Since there can be multiple rounds of this run all of which will be tied to the same + query id -- generated in compile phase , adding a additional UUID to the end to print each run + in separate files. + */ + LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + + // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later + driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs()); + } catch (RuntimeException e) { + LOG.error("replication failed with run time exception", e); + throw e; + } catch (Exception e) { + LOG.error("replication failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + LOG.info("completed load task run : {}", work.executedLoadTask()); + return 0; + } + + private void createEndReplLogTask(Context context, Scope scope, + ReplLogger replLogger) throws SemanticException { + Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters()); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork); + if (scope.rootTasks.isEmpty()) { + scope.rootTasks.add(replLogTask); + } else { + DAGTraversal.traverse(scope.rootTasks, + new AddDependencyToLeaves(Collections.singletonList(replLogTask))); + } + } + + /** + * There was a database update done before and we want to make sure we update the last repl + * id on this database as we are now going to switch to processing a new database. + * + * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id + * is a root level task then in the execution phase the root level tasks will get executed first, + * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed + * the last repl status of the target database will return a valid value, which will not represent + * the state of the database. + */ + private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope) + throws SemanticException { + /* + we don't want to put any limits on this task as this is essential before we start + processing new database events. + */ + TaskTracker taskTracker = + new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn, + new TaskTracker(maxTasks)).tasks(); + + AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks()); + DAGTraversal.traverse(scope.rootTasks, function); + + return taskTracker; + } + + private void partitionsPostProcessing(BootstrapEventsIterator iterator, + Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, + TaskTracker partitionsTracker) throws SemanticException { + setUpDependencies(tableTracker, partitionsTracker); + if (!scope.database && !scope.table) { + scope.rootTasks.addAll(partitionsTracker.tasks()); + scope.partition = true; + } + loadTaskTracker.update(tableTracker); + loadTaskTracker.update(partitionsTracker); + if (partitionsTracker.hasReplicationState()) { + iterator.setReplicationState(partitionsTracker.replicationState()); + } + } + + /* + This sets up dependencies such that a child task is dependant on the parent to be complete. + */ + private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { + if (parentTasks.hasTasks()) { + for (Task<? extends Serializable> parentTask : parentTasks.tasks()) { + for (Task<? extends Serializable> childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } else { + for (Task<? extends Serializable> childTask : childTasks.tasks()) { + parentTasks.addTask(childTask); + } + } + } + + private void createBuilderTask(List<Task<? extends Serializable>> rootTasks) { + // Use loadTask as dependencyCollection + Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf); + DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); + } + + @Override + public StageType getType() { + return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD; + } + + private int executeIncrementalLoad(DriverContext driverContext) { + try { + IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder(); + this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG)); + if (work.getIncrementalIterator().hasNext()) { + // attach a load task at the tail of task list to start the next iteration. + createBuilderTask(this.childTasks); + } + return 0; + } catch (Exception e) { + LOG.error("failed replication", e); + setException(e); + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java new file mode 100644 index 0000000..8921e94 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.session.LineageState; + +import java.io.IOException; +import java.io.Serializable; + +@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ReplLoadWork implements Serializable { + final String dbNameToLoadIn; + final String tableNameToLoadIn; + final String dumpDirectory; + private final transient BootstrapEventsIterator bootstrapIterator; + private final ConstraintEventsIterator constraintsIterator; + private final transient IncrementalLoadEventsIterator incrementalIterator; + private int loadTaskRunCount = 0; + private DatabaseEvent.State state = null; + private final transient IncrementalLoadTasksBuilder incrementalLoad; + + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + final LineageState sessionStateLineageState; + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, + String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException { + this.tableNameToLoadIn = tableNameToLoadIn; + sessionStateLineageState = lineageState; + this.dumpDirectory = dumpDirectory; + this.dbNameToLoadIn = dbNameToLoadIn; + if (isIncrementalDump) { + incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf); + this.bootstrapIterator = null; + this.constraintsIterator = null; + incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, + incrementalIterator, hiveConf); + } else { + this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); + incrementalIterator = null; + incrementalLoad = null; + } + } + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, + LineageState lineageState) throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false); + } + + public BootstrapEventsIterator iterator() { + return bootstrapIterator; + } + + public ConstraintEventsIterator constraintIterator() { + return constraintsIterator; + } + + int executedLoadTask() { + return ++loadTaskRunCount; + } + + void updateDbEventState(DatabaseEvent.State state) { + this.state = state; + } + + DatabaseEvent databaseEvent(HiveConf hiveConf) { + return state.toEvent(hiveConf); + } + + boolean hasDbState() { + return state != null; + } + + public boolean isIncrementalLoad() { + return incrementalIterator != null; + } + + public IncrementalLoadEventsIterator getIncrementalIterator() { + return incrementalIterator; + } + + public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() { + return incrementalLoad; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java deleted file mode 100644 index e41e4b5..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.repl; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.plan.ImportTableDesc; -import org.apache.hadoop.hive.ql.stats.StatsUtils; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - - -public class ReplUtils { - - public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; - - /** - * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. - */ - public enum ReplLoadOpType { - LOAD_NEW, LOAD_SKIP, LOAD_REPLACE - } - - public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs( - Table table, List<Map<String, String>> partitions) throws SemanticException { - Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>(); - int partPrefixLength = 0; - if (partitions.size() > 0) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>(); - for (Map<String, String> ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry<String, String> kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - partitionDesc.add(expr); - } - } - if (partitionDesc.size() > 0) { - partSpecs.put(partPrefixLength, partitionDesc); - } - return partSpecs; - } - - public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) - throws SemanticException { - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); - return TaskFactory.get(replLogWork, conf); - } - - public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec, - String dumpRoot, HiveConf conf) throws SemanticException { - HashMap<String, String> mapProp = new HashMap<>(); - mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot); - - AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName( - StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName())); - if (partSpec != null) { - alterTblDesc.setPartSpec(partSpec); - } - return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); - } - - public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot) - throws InvalidOperationException { - // If ckpt property not set or empty means, bootstrap is not run on this object. - if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) { - if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) { - return true; - } - throw new InvalidOperationException(ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.format(dumpRoot, - props.get(REPL_CHECKPOINT_KEY))); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java deleted file mode 100644 index 0313058..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; - -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; - -public class AddDependencyToLeaves implements DAGTraversal.Function { - private List<Task<? extends Serializable>> postDependencyCollectionTasks; - - AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) { - this.postDependencyCollectionTasks = postDependencyCollectionTasks; - } - - public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) { - this(Collections.singletonList(postDependencyTask)); - } - - - @Override - public void process(Task<? extends Serializable> task) { - if (task.getChildTasks() == null) { - postDependencyCollectionTasks.forEach(task::addDependentTask); - } - } - - @Override - public boolean skipProcessing(Task<? extends Serializable> task) { - return postDependencyCollectionTasks.contains(task); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java deleted file mode 100644 index b33a774..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; -import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; -import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.ErrorMsg; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; - -public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { - private final static int ZERO_TASKS = 0; - - @Override - public String getName() { - return "REPL_BOOTSTRAP_LOAD"; - } - - /** - * Provides the root Tasks created as a result of this loadTask run which will be executed - * by the driver. It does not track details across multiple runs of LoadTask. - */ - private static class Scope { - boolean database = false, table = false, partition = false; - List<Task<? extends Serializable>> rootTasks = new ArrayList<>(); - } - - @Override - protected int execute(DriverContext driverContext) { - try { - int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(work.dumpDirectory, conf, getHive(), - work.sessionStateLineageState, driverContext.getCtx()); - TaskTracker loadTaskTracker = new TaskTracker(maxTasks); - /* - for now for simplicity we are doing just one directory ( one database ), come back to use - of multiple databases once we have the basic flow to chain creating of tasks in place for - a database ( directory ) - */ - BootstrapEventsIterator iterator = work.iterator(); - ConstraintEventsIterator constraintIterator = work.constraintIterator(); - /* - This is used to get hold of a reference during the current creation of tasks and is initialized - with "0" tasks such that it will be non consequential in any operations done with task tracker - compositions. - */ - TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); - TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); - Scope scope = new Scope(); - boolean loadingConstraint = false; - if (!iterator.hasNext() && constraintIterator.hasNext()) { - loadingConstraint = true; - } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { - BootstrapEvent next; - if (!loadingConstraint) { - next = iterator.next(); - } else { - next = constraintIterator.next(); - } - switch (next.eventType()) { - case Database: - DatabaseEvent dbEvent = (DatabaseEvent) next; - dbTracker = - new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker) - .tasks(); - loadTaskTracker.update(dbTracker); - if (work.hasDbState()) { - loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); - } - work.updateDbEventState(dbEvent.toState()); - if (dbTracker.hasTasks()) { - scope.rootTasks.addAll(dbTracker.tasks()); - scope.database = true; - } - dbTracker.debugLog("database"); - break; - case Table: { - /* - Implicit assumption here is that database level is processed first before table level, - which will depend on the iterator used since it should provide the higher level directory - listing before providing the lower level listing. This is also required such that - the dbTracker / tableTracker are setup correctly always. - */ - TableContext tableContext = - new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); - TableEvent tableEvent = (TableEvent) next; - LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), - tableContext, loadTaskTracker); - tableTracker = loadTable.tasks(); - setUpDependencies(dbTracker, tableTracker); - if (!scope.database && tableTracker.hasTasks()) { - scope.rootTasks.addAll(tableTracker.tasks()); - scope.table = true; - } - /* - for table replication if we reach the max number of tasks then for the next run we will - try to reload the same table again, this is mainly for ease of understanding the code - as then we can avoid handling == > loading partitions for the table given that - the creation of table lead to reaching max tasks vs, loading next table since current - one does not have partitions. - */ - - // for a table we explicitly try to load partitions as there is no separate partitions events. - LoadPartitions loadPartitions = - new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext); - TaskTracker partitionsTracker = loadPartitions.tasks(); - partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, - partitionsTracker); - tableTracker.debugLog("table"); - partitionsTracker.debugLog("partitions for table"); - break; - } - case Partition: { - /* - This will happen only when loading tables and we reach the limit of number of tasks we can create; - hence we know here that the table should exist and there should be a lastPartitionName - */ - PartitionEvent event = (PartitionEvent) next; - TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, - work.tableNameToLoadIn); - LoadPartitions loadPartitions = - new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); - /* - the tableTracker here should be a new instance and not an existing one as this can - only happen when we break in between loading partitions. - */ - TaskTracker partitionsTracker = loadPartitions.tasks(); - partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, - partitionsTracker); - partitionsTracker.debugLog("partitions"); - break; - } - case Function: { - LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(), - (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); - TaskTracker functionsTracker = loadFunction.tasks(); - if (!scope.database) { - scope.rootTasks.addAll(functionsTracker.tasks()); - } else { - setUpDependencies(dbTracker, functionsTracker); - } - loadTaskTracker.update(functionsTracker); - functionsTracker.debugLog("functions"); - break; - } - case Constraint: { - LoadConstraint loadConstraint = - new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); - TaskTracker constraintTracker = loadConstraint.tasks(); - scope.rootTasks.addAll(constraintTracker.tasks()); - loadTaskTracker.update(constraintTracker); - constraintTracker.debugLog("constraints"); - } - } - - if (!loadingConstraint && !iterator.currentDbHasNext()) { - createEndReplLogTask(context, scope, iterator.replLogger()); - } - } - boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() - || constraintIterator.hasNext(); - createBuilderTask(scope.rootTasks, addAnotherLoadTask); - if (!iterator.hasNext() && !constraintIterator.hasNext()) { - loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); - work.updateDbEventState(null); - } - this.childTasks = scope.rootTasks; - /* - Since there can be multiple rounds of this run all of which will be tied to the same - query id -- generated in compile phase , adding a additional UUID to the end to print each run - in separate files. - */ - LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); - - // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later - driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs()); - } catch (Exception e) { - LOG.error("failed replication", e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - LOG.info("completed load task run : {}", work.executedLoadTask()); - return 0; - } - - private void createEndReplLogTask(Context context, Scope scope, - ReplLogger replLogger) throws SemanticException { - Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters()); - Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork); - if (scope.rootTasks.isEmpty()) { - scope.rootTasks.add(replLogTask); - } else { - DAGTraversal.traverse(scope.rootTasks, - new AddDependencyToLeaves(Collections.singletonList(replLogTask))); - } - } - - /** - * There was a database update done before and we want to make sure we update the last repl - * id on this database as we are now going to switch to processing a new database. - * - * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id - * is a root level task then in the execution phase the root level tasks will get executed first, - * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed - * the last repl status of the target database will return a valid value, which will not represent - * the state of the database. - */ - private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope) - throws SemanticException { - /* - we don't want to put any limits on this task as this is essential before we start - processing new database events. - */ - TaskTracker taskTracker = - new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn, - new TaskTracker(maxTasks)).tasks(); - - AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks()); - DAGTraversal.traverse(scope.rootTasks, function); - - return taskTracker; - } - - private void partitionsPostProcessing(BootstrapEventsIterator iterator, - Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, - TaskTracker partitionsTracker) throws SemanticException { - setUpDependencies(tableTracker, partitionsTracker); - if (!scope.database && !scope.table) { - scope.rootTasks.addAll(partitionsTracker.tasks()); - scope.partition = true; - } - loadTaskTracker.update(tableTracker); - loadTaskTracker.update(partitionsTracker); - if (partitionsTracker.hasReplicationState()) { - iterator.setReplicationState(partitionsTracker.replicationState()); - } - } - - /* - This sets up dependencies such that a child task is dependant on the parent to be complete. - */ - private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { - if (parentTasks.hasTasks()) { - for (Task<? extends Serializable> parentTask : parentTasks.tasks()) { - for (Task<? extends Serializable> childTask : childTasks.tasks()) { - parentTask.addDependentTask(childTask); - } - } - } else { - for (Task<? extends Serializable> childTask : childTasks.tasks()) { - parentTasks.addTask(childTask); - } - } - } - - private void createBuilderTask(List<Task<? extends Serializable>> rootTasks, - boolean shouldCreateAnotherLoadTask) { - /* - use loadTask as dependencyCollection - */ - if (shouldCreateAnotherLoadTask) { - Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf); - DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); - } - } - - @Override - public StageType getType() { - return StageType.REPL_BOOTSTRAP_LOAD; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java deleted file mode 100644 index 048727f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; -import org.apache.hadoop.hive.ql.plan.Explain; -import org.apache.hadoop.hive.ql.session.LineageState; - -import java.io.IOException; -import java.io.Serializable; - -@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, - Explain.Level.DEFAULT, - Explain.Level.EXTENDED }) -public class ReplLoadWork implements Serializable { - final String dbNameToLoadIn; - final String tableNameToLoadIn; - final String dumpDirectory; - private final BootstrapEventsIterator iterator; - private final ConstraintEventsIterator constraintsIterator; - private int loadTaskRunCount = 0; - private DatabaseEvent.State state = null; - - /* - these are sessionState objects that are copied over to work to allow for parallel execution. - based on the current use case the methods are selectively synchronized, which might need to be - taken care when using other methods. - */ - final LineageState sessionStateLineageState; - - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState) - throws IOException { - this.tableNameToLoadIn = tableNameToLoadIn; - sessionStateLineageState = lineageState; - this.dumpDirectory = dumpDirectory; - this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); - this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); - this.dbNameToLoadIn = dbNameToLoadIn; - } - - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, - LineageState lineageState) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState); - } - - public BootstrapEventsIterator iterator() { - return iterator; - } - - public ConstraintEventsIterator constraintIterator() { - return constraintsIterator; - } - - int executedLoadTask() { - return ++loadTaskRunCount; - } - - void updateDbEventState(DatabaseEvent.State state) { - this.state = state; - } - - DatabaseEvent databaseEvent(HiveConf hiveConf) { - return state.toEvent(hiveConf); - } - - boolean hasDbState() { - return state != null; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 89d2ac2..ebe0090 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -82,6 +82,15 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { FileSystem fileSystem = path.getFileSystem(hiveConf); FileStatus[] fileStatuses = fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem)); + if ((fileStatuses == null) || (fileStatuses.length == 0)) { + throw new IllegalArgumentException("No data to load in path " + dumpDirectory); + } + if ((dbNameToLoadIn != null) && (fileStatuses.length > 1)) { + throw new IllegalArgumentException( + "Multiple dirs in " + + dumpDirectory + + " does not correspond to REPL LOAD expecting to load to a singular destination point."); + } List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> { Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME); http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index 26f4892..d09b98c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 7ddae6f..0fd305a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -31,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import java.io.Serializable; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index b886ff4..a7c8ca4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -26,9 +26,10 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java deleted file mode 100644 index f8f0801..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; - -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; -import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * This class will be responsible to track how many tasks have been created, - * organization of tasks such that after the number of tasks for next execution are created - * we create a dependency collection task(DCT) -> another bootstrap task, - * and then add DCT as dependent to all existing tasks that are created so the cycle can continue. - */ -public class TaskTracker { - private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class); - /** - * used to identify the list of tasks at root level for a given level like table / db / partition. - * this does not include the task dependency notion of "table tasks < ---- partition task" - */ - private final List<Task<? extends Serializable>> tasks = new ArrayList<>(); - private ReplicationState replicationState = null; - // since tasks themselves can be graphs we want to limit the number of created - // tasks including all of dependencies. - private int numberOfTasks = 0; - private final int maxTasksAllowed; - - public TaskTracker(int defaultMaxTasks) { - maxTasksAllowed = defaultMaxTasks; - } - - public TaskTracker(TaskTracker existing) { - maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks; - } - - /** - * this method is used to identify all the tasks in a graph. - * the graph however might get created in a disjoint fashion, in which case we can just update - * the number of tasks using the "update" method. - */ - public void addTask(Task<? extends Serializable> task) { - tasks.add(task); - - List <Task<? extends Serializable>> visited = new ArrayList<>(); - updateTaskCount(task, visited); - } - - // This method is used to traverse the DAG created in tasks list and add the dependent task to - // the tail of each task chain. - public void addDependentTask(Task<? extends Serializable> dependent) { - if (tasks.isEmpty()) { - addTask(dependent); - } else { - DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent)); - - List<Task<? extends Serializable>> visited = new ArrayList<>(); - updateTaskCount(dependent, visited); - } - } - - private void updateTaskCount(Task<? extends Serializable> task, - List <Task<? extends Serializable>> visited) { - numberOfTasks += 1; - visited.add(task); - if (task.getChildTasks() != null) { - for (Task<? extends Serializable> childTask : task.getChildTasks()) { - if (visited.contains(childTask)) { - continue; - } - updateTaskCount(childTask, visited); - } - } - } - - public boolean canAddMoreTasks() { - return numberOfTasks < maxTasksAllowed; - } - - public boolean hasTasks() { - return numberOfTasks != 0; - } - - public void update(TaskTracker withAnother) { - numberOfTasks += withAnother.numberOfTasks; - if (withAnother.hasReplicationState()) { - this.replicationState = withAnother.replicationState; - } - } - - public void setReplicationState(ReplicationState state) { - this.replicationState = state; - } - - public boolean hasReplicationState() { - return replicationState != null; - } - - public ReplicationState replicationState() { - return replicationState; - } - - public List<Task<? extends Serializable>> tasks() { - return tasks; - } - - public void debugLog(String forEventType) { - LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks, - tasks.size()); - } - - public int numberOfTasks() { - return numberOfTasks; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f6493f7..c0cfc43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 419a511..089b529 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -27,10 +27,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java index b5b5b90..8e01fb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
