Update falcon branch 0.10-refactored-ui to be up to date with branch 0.10 Author: bvellanki <[email protected]> Author: yzheng-hortonworks <[email protected]> Author: Praveen Adlakha <[email protected]> Author: peeyush b <[email protected]> Author: Venkatesan Ramachandran <[email protected]> Author: Pallavi Rao <[email protected]> Author: Sowmya Ramesh <[email protected]> Author: Peeyush <[email protected]> Author: Venkat Ranganathan <[email protected]> Author: Sowmya Ramesh <[email protected]> Author: Srikanth Sundarrajan <[email protected]> Author: Ajay Yadava <[email protected]> Author: Ajay Yadava <[email protected]> Author: Murali Ramasami <murali dot msse at gmail dot com> Author: sandeep <[email protected]>
Reviewers: "Sowmya Ramesh <[email protected]>" Closes #267 from bvellanki/0.10-refactored-ui Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/01a303e3 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/01a303e3 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/01a303e3 Branch: refs/heads/0.10-refactored-ui Commit: 01a303e3bdfe8a71d069531ff7b01325cca5165e Parents: 48b877a Author: bvellanki <[email protected]> Authored: Tue Aug 16 15:48:15 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue Aug 16 15:48:15 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 60 +---- Installation-steps.txt | 24 +- LICENSE.txt | 20 +- NOTICE.txt | 9 +- acquisition/pom.xml | 2 +- addons/adf/pom.xml | 2 +- addons/designer/pom.xml | 2 +- addons/extensions/hdfs-mirroring/pom.xml | 2 +- .../runtime/hdfs-mirroring-workflow.xml | 4 + .../extensions/hdfs-snapshot-mirroring/pom.xml | 2 +- addons/extensions/hive-mirroring/pom.xml | 2 +- .../runtime/hive-mirroring-secure-workflow.xml | 48 ++-- .../runtime/hive-mirroring-workflow.xml | 48 ++-- addons/hdfs-snapshot-mirroring/pom.xml | 11 +- .../replication/HdfsSnapshotReplicator.java | 8 +- .../retention/HdfsSnapshotEvictor.java | 6 +- .../falcon/snapshots/util/HdfsSnapshotUtil.java | 15 +- .../replication/HdfsSnapshotReplicatorTest.java | 12 +- addons/hivedr/pom.xml | 11 +- .../java/org/apache/falcon/hive/HiveDRArgs.java | 7 +- .../org/apache/falcon/hive/HiveDROptions.java | 37 +++- .../java/org/apache/falcon/hive/HiveDRTool.java | 7 + .../falcon/hive/LastReplicatedEvents.java | 4 +- .../falcon/hive/mapreduce/CopyMapper.java | 16 +- .../falcon/hive/mapreduce/CopyReducer.java | 16 +- .../falcon/hive/util/EventSourcerUtils.java | 4 +- .../org/apache/falcon/hive/util/EventUtils.java | 23 +- .../org/apache/falcon/hive/util/FileUtils.java | 6 +- .../falcon/hive/util/HiveDRStatusStore.java | 28 ++- .../apache/falcon/hive/util/HiveDRUtils.java | 15 +- .../java/org/apache/falcon/hive/DRTest.java | 4 +- archival/pom.xml | 2 +- build-tools/pom.xml | 2 +- cli/pom.xml | 3 +- client/pom.xml | 3 +- common/pom.xml | 2 +- .../org/apache/falcon/entity/ClusterHelper.java | 5 +- .../apache/falcon/entity/FileSystemStorage.java | 11 +- .../falcon/entity/WorkflowNameBuilder.java | 7 - .../entity/parser/ClusterEntityParser.java | 17 +- .../falcon/entity/store/ConfigurationStore.java | 29 ++- .../apache/falcon/entity/v0/EntityGraph.java | 10 + .../falcon/hadoop/HadoopClientFactory.java | 4 +- .../falcon/metadata/GraphUpdateUtils.java | 113 ++++++++++ .../falcon/metadata/MetadataMappingService.java | 74 ++++++- .../AuthenticationInitializationService.java | 18 +- .../security/DefaultAuthorizationProvider.java | 8 +- .../workflow/WorkflowExecutionContext.java | 4 +- .../WorkflowJobEndNotificationService.java | 6 +- common/src/main/resources/startup.properties | 21 +- .../entity/parser/FeedEntityParserTest.java | 3 +- .../falcon/entity/v0/EntityGraphTest.java | 73 +++++- .../metadata/MetadataMappingServiceTest.java | 6 +- .../DefaultAuthorizationProviderTest.java | 4 +- distro/pom.xml | 4 +- docs/license/animate-LICENSE.txt | 21 ++ docs/license/cabin-font-LICENSE.txt | 95 ++++++++ docs/license/ngMask-LICENSE.txt | 13 ++ docs/license/ngTagsInput-LICENSE.txt | 20 ++ docs/license/normalize-LICENSE.txt | 22 ++ docs/pom.xml | 2 +- docs/src/site/twiki/Configuration.twiki | 62 +++++- docs/src/site/twiki/DataReplicationAzure.twiki | 61 +++++ docs/src/site/twiki/Embedded-mode.twiki | 3 +- docs/src/site/twiki/EntitySpecification.twiki | 59 ++++- docs/src/site/twiki/Extensions.twiki | 3 +- docs/src/site/twiki/FalconDocumentation.twiki | 4 +- .../site/twiki/GraphiteMetricCollection.twiki | 22 ++ docs/src/site/twiki/InstallationSteps.twiki | 15 +- docs/src/site/twiki/MigrationInstructions.twiki | 29 ++- docs/src/site/twiki/Operability.twiki | 5 + docs/src/site/twiki/restapi/EntityList.twiki | 44 ++-- docs/src/site/twiki/restapi/ResourceList.twiki | 2 +- examples/app/spark/wordcount.py | 52 +++++ examples/entity/spark/pyspark-process.xml | 52 +++++ examples/entity/spark/spark-process-pi.xml | 44 ++++ examples/entity/spark/spark-process.xml | 53 +++++ examples/entity/spark/spark-sql-process.xml | 55 +++++ examples/pom.xml | 41 +++- .../apache/falcon/example/spark/SparkPI.java | 72 ++++++ .../example/spark/SparkSQLProcessTable.java | 51 +++++ .../falcon/example/spark/SparkWordCount.java | 74 +++++++ extensions/pom.xml | 2 +- .../mirroring/hive/HiveMirroringExtension.java | 19 +- .../hive/HiveMirroringExtensionProperties.java | 9 +- falcon-regression/merlin-core/pom.xml | 2 +- falcon-regression/merlin/pom.xml | 2 +- .../falcon/regression/FeedLateRerunTest.java | 2 +- .../falcon/regression/FeedReplicationTest.java | 2 +- .../falcon/regression/InstanceSummaryTest.java | 4 +- .../ProcessInstanceColoMixedTest.java | 2 +- .../regression/hcat/HCatFeedOperationsTest.java | 8 +- .../regression/hcat/HCatReplicationTest.java | 2 +- .../regression/hive/dr/HdfsRecipeTest.java | 2 +- .../falcon/regression/hive/dr/HiveDRTest.java | 2 +- .../falcon/regression/hive/dr/HiveDbDRTest.java | 2 +- .../regression/lineage/EntitySummaryTest.java | 2 +- .../lineage/ListFeedInstancesTest.java | 2 +- .../nativeScheduler/NativeScheduleTest.java | 2 +- .../prism/PrismFeedReplicationUpdateTest.java | 2 +- .../regression/prism/PrismFeedScheduleTest.java | 2 +- .../regression/prism/PrismFeedUpdateTest.java | 2 +- .../regression/prism/PrismProcessSnSTest.java | 8 +- .../falcon/regression/searchUI/MirrorTest.java | 2 +- falcon-regression/pom.xml | 4 +- falcon-ui/app/css/img/user.svg | 16 ++ falcon-ui/app/css/styles/autocomplete-tags.less | 20 +- falcon-ui/app/js/lib/popover.js | 20 +- falcon-ui/app/test/e2e/protractor.js | 20 +- falcon-ui/karma.conf.js | 18 ++ falcon-ui/pom.xml | 2 +- hadoop-dependencies/pom.xml | 2 +- lifecycle/pom.xml | 2 +- .../resources/action/feed/eviction-action.xml | 2 +- messaging/pom.xml | 2 +- .../falcon/messaging/JMSMessageConsumer.java | 3 +- .../messaging/JMSMessageConsumerTest.java | 57 +---- metrics/pom.xml | 2 +- oozie-el-extensions/pom.xml | 2 +- .../oozie/extensions/OozieELExtensions.java | 2 +- .../oozie/extensions/TestOozieELExtensions.java | 2 + oozie/pom.xml | 2 +- .../feed/FeedReplicationCoordinatorBuilder.java | 5 - .../feed/FeedReplicationWorkflowBuilder.java | 7 +- .../feed/HCatReplicationWorkflowBuilder.java | 34 ++- .../process/SparkProcessWorkflowBuilder.java | 55 +++-- .../falcon/workflow/FalconPostProcessing.java | 15 ++ .../workflow/engine/OozieWorkflowEngine.java | 16 +- .../resources/action/feed/eviction-action.xml | 2 +- .../main/resources/action/feed/table-export.xml | 1 - .../main/resources/action/feed/table-import.xml | 1 - .../feed/OozieFeedWorkflowBuilderTest.java | 8 - .../OozieProcessWorkflowBuilderTest.java | 77 ++++++- .../workflow/FalconPostProcessingTest.java | 3 + .../config/process/spark-sql-process.xml | 53 +++++ pom.xml | 31 +-- prism/pom.xml | 12 +- .../plugin/GraphiteNotificationPlugin.java | 35 +-- .../falcon/resource/AbstractEntityManager.java | 11 +- .../org/apache/falcon/util/EmbeddedServer.java | 1 + .../resource/metadata/MetadataTestContext.java | 4 + release-docs/0.10/CHANGES.0.10.md | 220 +++++++++++++++++++ replication/pom.xml | 7 +- .../falcon/replication/FeedReplicator.java | 19 +- .../falcon/replication/FeedReplicatorTest.java | 22 +- rerun/pom.xml | 2 +- .../falcon/rerun/handler/RetryConsumer.java | 6 +- .../falcon/rerun/handler/RetryHandler.java | 12 +- retention/pom.xml | 2 +- .../falcon/retention/FeedEvictorTest.java | 53 +++++ scheduler/pom.xml | 14 +- .../workflow/engine/FalconWorkflowEngine.java | 15 +- src/bin/graphdbutil.sh | 118 ++++++++++ src/conf/hbase-site.xml.template | 2 +- src/conf/startup.properties | 31 +-- src/main/assemblies/distributed-package.xml | 7 + src/main/assemblies/standalone-package.xml | 7 + test-tools/hadoop-webapp/pom.xml | 2 +- test-tools/hcatalog-sharelib/pom.xml | 2 +- test-tools/hive-sharelib/pom.xml | 2 +- test-tools/oozie-sharelib/pom.xml | 2 +- test-tools/pig-sharelib/pom.xml | 2 +- test-tools/pom.xml | 2 +- test-util/pom.xml | 2 +- titan/pom.xml | 7 +- unit/pom.xml | 2 +- webapp/pom.xml | 5 +- .../falcon/resource/ExtensionManagerIT.java | 9 +- .../InstanceSchedulerManagerJerseyIT.java | 13 +- .../resource/ProcessInstanceManagerIT.java | 17 +- .../org/apache/falcon/resource/TestContext.java | 15 ++ webapp/src/test/resources/startup.properties | 4 +- 172 files changed, 2526 insertions(+), 588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8fb8186..767c5a2 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,60 +1,14 @@ Apache Falcon Change log -Trunk - TASKS: - INCOMPATIBLE CHANGES - NEW FEATURES - FALCON-1627 Provider integration with Azure Data Factory pipelines (Ying Zheng, Venkat Ranganathan, Sowmya Ramesh) - - FALCON-1664 Add Postgres support for native scheduler(Deepak Barr via Pragya Mittal) - - FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava) - - FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava) - - IMPROVEMENTS - FALCON-1584 Falcon allows invalid hadoop queue name for schedulable feed entities (Venkatesan Ramachandran via Balu Vellanki) - - FALCON-1774 Falcon to honour PRISM_URL env var (Praveen Adlakha) - - FALCON-1721 Checkstyle doesn't extend parent. - - FALCON-1818 Minor doc update for tar package locations after FALCON-1751 (Deepak Barr) - - FALCON-1729 Database ingest to support password alias via keystore file (Venkatesan Ramachandran via Balu Vellanki) - - FALCON-1751 Support assembly:single mojo(Ruoyu Wang via Ajay Yadava) - - FALCON-763 Support feed listing for CatalogStorage (Balu Vellanki) - - FALCON-1764 Remove temporary folder "localhost" created during tests(Praveen Adlakha via Ajay Yadava) - - FALCON-1756 Remove PID files on service stop(Deepak Barr via Ajay Yadava) - - FALCON-1771 Tool to merge pull requests (Ajay Yadava) - - FALCON-1770 Update README file (Ajay Yadava) - - BUG FIXES - FALCON-1842 Falcon build failed in Jenkins at OozieFeedWorkflowBuilderTest (Balu Vellanki) - - FALCON-887 Support for multiple lib paths in falcon process (Sowmya Ramesh) - - FALCON-1795 Kill api not killing waiting/ready instances - - FALCON-1804 Non-SLA feed throws NullPointerException. - - FALCON-1806 Update documentation for Import and Export. (Venkatesan Ramachandran via Balu Vellanki) - - FALCON-1787 Ooozie pig-action.xml requires hive sharelib for HCatalog use(Sowmya Ramesh via Ajay Yadava) - - FALCON-1792 Upgrade hadoop.version to 2.6.2 (Venkatesan Ramachandran via Peeyush Bishnoi) - - FALCON-1796 [HOTFIX] Incorrect parent pom in distro module(Ajay Yadava) - FALCON-1779 Fix rat-check failure in trunk (Ajay Yadava) +Future Releases: +For changes in releases after 0.10, please refer to "release-docs" or +https://github.com/apache/falcon/tree/master/release-docs/ +Release Version: 0.10 +For changes in release 0.10, please refer to "release-docs" or +https://github.com/apache/falcon/blob/0.10/release-docs/0.10/CHANGES.0.10.md -Proposed Release Version: 0.9 +Release Version: 0.9 TASKS: FALCON-1778 Check licenses and copyright information (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/Installation-steps.txt ---------------------------------------------------------------------- diff --git a/Installation-steps.txt b/Installation-steps.txt index b86d6a1..84f0c99 100644 --- a/Installation-steps.txt +++ b/Installation-steps.txt @@ -41,10 +41,12 @@ a. Building falcon from the source release * export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=256m -noverify" && mvn clean install [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a specific version of hadoop] -*Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards - Falcon build with JDK 1.7 using -noverify option - To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this - Hive >= 1.2.0 and Oozie >= 4.2.0 should be available. +*Note 1:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards + Falcon build with JDK 1.7 using -noverify option +*Note 2:* To compile Falcon with addon extensions, append additional profiles to build command using syntax -P<<profile1,profile2>> + For Hive Mirroring extension, use profile"hivedr". Hive >= 1.2.0 and Oozie >= 4.2.0 is required + For HDFS Snapshot mirroring extension, use profile "hdfs-snapshot-mirroring". Hadoop >= 2.7.0 is required + For ADF integration, use profile "adf" b. Building falcon from the source repository @@ -55,10 +57,12 @@ b. Building falcon from the source repository * export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=256m -noverify" && mvn clean install [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a specific version of hadoop] -*Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards - Falcon build with JDK 1.7 using -noverify option - To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this - Hive >= 1.2.0 and Oozie >= 4.2.0 should be available. +*Note 1:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards + Falcon build with JDK 1.7 using -noverify option +*Note 2:* To compile Falcon with addon extensions, append additional profiles to build command using syntax -P<<profile1,profile2>> + For Hive Mirroring extension, use profile"hivedr". Hive >= 1.2.0 and Oozie >= 4.2.0 is required + For HDFS Snapshot mirroring extension, use profile "hdfs-snapshot-mirroring". Hadoop >= 2.7.0 is required + For ADF integration, use profile "adf" 2. Deploying Falcon @@ -118,8 +122,8 @@ c. Using Falcon ~~~~~~~~~~~~~~~ * bin/falcon admin -version - Falcon server build version: {Version:"0.3-SNAPSHOT-rd7e2be9afa2a5dc96acd1ec9e325f39c6b2f17f7",Mode:"embedded"} - +Falcon server build version: {"properties":[{"key":"Version","value":"0.10-rbe02edf0d5b10af27bbac694e536bef30885c00e"}, +{"key":"Mode","value":"embedded"},{"key":"authentication","value":"simple"},{"key":"safemode","value":"false"}]} * bin/falcon help (for more details about falcon cli usage) http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index 80f8aa2..f61385f 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -238,8 +238,24 @@ MIT license. For details, see docs/license/angular-ui-router-LICENSE.txt This product bundles jasmine 2.0.2 which is available under a MIT license. For details, see docs/license/jasmine-LICENSE.txt - This product bundles entypo icons which is available under a CC BY-SA license and Font is available under SIL license. For details, see docs/license/entypo-icons-LICENSE.txt and -docs/license/entypo-font-LICENSE.txt +docs/license/entypo-font-LICENSE.txt. (Entypo pictograms by Daniel +Bruce - www.entypo.com) + +This product bundles Cabin webfont [regular, italic & bold] under SIL +license. For details see docs/license/cabin-font-LICENSE.txt. +(www.impallari.com & www.ikern.com) + +This product bundles normalize v3.0.1 which is available under +MIT license. For details see docs/license/normalize-LICENSE.txt + +This product bundles animate v3.2.5 which is available under +MIT license For details see docs/license/animate-LICENSE.txt + +This product bundles ngTagsInput v2.3.0 which is available under +MIT license. For details see docs/license/ngTagsInput-LICENSE.txt + +This product bundles ngMask v3.1.1 which is available under +MIT license. For details see docs/license/ngMask-LICENSE.txt http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/NOTICE.txt ---------------------------------------------------------------------- diff --git a/NOTICE.txt b/NOTICE.txt index c9259e8..7521153 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,6 +1,13 @@ Apache Falcon -Copyright 2011-2015 The Apache Software Foundation +Copyright 2011-2016 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + +***************** +CC BY-SA 3.0 +***************** + +The following binary components are provided under CC BY-SA 3.0 + entypo-icon (Entypo pictograms by Daniel Bruce - www.entypo.com) http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/acquisition/pom.xml ---------------------------------------------------------------------- diff --git a/acquisition/pom.xml b/acquisition/pom.xml index 38ea259..78bbc5f 100644 --- a/acquisition/pom.xml +++ b/acquisition/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-acquisition</artifactId> <description>Apache Falcon Acquisition Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/adf/pom.xml ---------------------------------------------------------------------- diff --git a/addons/adf/pom.xml b/addons/adf/pom.xml index 0042f5c..dc0988a 100644 --- a/addons/adf/pom.xml +++ b/addons/adf/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <relativePath>../../pom.xml</relativePath> </parent> <artifactId>falcon-adf</artifactId> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/designer/pom.xml ---------------------------------------------------------------------- diff --git a/addons/designer/pom.xml b/addons/designer/pom.xml index 4be24c3..a6922df 100644 --- a/addons/designer/pom.xml +++ b/addons/designer/pom.xml @@ -21,7 +21,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.falcon.designer</groupId> <artifactId>designer-main</artifactId> - <version>0.6-SNAPSHOT</version> + <version>0.10</version> <description>Apache Falcon Pipeline Designer</description> <name>Apache Falcon Pipeline Designer</name> <packaging>pom</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hdfs-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-mirroring/pom.xml b/addons/extensions/hdfs-mirroring/pom.xml index cb9304e..bfaf425 100644 --- a/addons/extensions/hdfs-mirroring/pom.xml +++ b/addons/extensions/hdfs-mirroring/pom.xml @@ -25,7 +25,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.falcon.extensions</groupId> <artifactId>falcon-hdfs-mirroring-extension</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <description>Apache Falcon sample Hdfs mirroring extension</description> <name>Apache Falcon sample Hdfs mirroring extension</name> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml index 1e2282c..c0504fb 100644 --- a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml +++ b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml @@ -51,6 +51,10 @@ <name>oozie.launcher.mapreduce.job.hdfs-servers</name> <value>${sourceClusterFS},${targetClusterFS}</value> </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceClusterFS},${targetClusterFS}</value> + </property> </configuration> <main-class>org.apache.falcon.replication.FeedReplicator</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hdfs-snapshot-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/pom.xml b/addons/extensions/hdfs-snapshot-mirroring/pom.xml index b0b4819..7aaee3d 100644 --- a/addons/extensions/hdfs-snapshot-mirroring/pom.xml +++ b/addons/extensions/hdfs-snapshot-mirroring/pom.xml @@ -25,7 +25,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.falcon.extensions</groupId> <artifactId>falcon-hdfs-snapshot-mirroring-extension</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <description>Apache Falcon HDFS Snapshot Mirroring Extension</description> <name>Apache Falcon Sample HDFS Snapshot Mirroring Extension</name> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hive-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/pom.xml b/addons/extensions/hive-mirroring/pom.xml index adfb0be..9d08835 100644 --- a/addons/extensions/hive-mirroring/pom.xml +++ b/addons/extensions/hive-mirroring/pom.xml @@ -25,7 +25,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.falcon.extensions</groupId> <artifactId>falcon-hive-mirroring-extension</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <description>Apache Falcon sample Hive mirroring extension</description> <name>Apache Falcon sample Hive mirroring extension</name> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml index 4bf048f..63e9a67 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml @@ -96,18 +96,16 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-sourceCluster</arg> <arg>${sourceCluster}</arg> <arg>-sourceMetastoreUri</arg> <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -144,8 +142,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>lastevents</arg> </java> @@ -190,8 +190,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -202,10 +200,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -244,8 +242,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>export</arg> <arg>-counterLogDir</arg> @@ -292,8 +292,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -304,10 +302,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -346,8 +344,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>import</arg> </java> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml index 9f9bf92..4f6eec5 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml @@ -46,18 +46,16 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-sourceCluster</arg> <arg>${sourceCluster}</arg> <arg>-sourceMetastoreUri</arg> <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -80,8 +78,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>lastevents</arg> </java> @@ -118,8 +118,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -130,10 +128,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -158,8 +156,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>export</arg> <arg>-counterLogDir</arg> @@ -198,8 +198,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -210,10 +208,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -238,8 +236,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>import</arg> </java> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hdfs-snapshot-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/pom.xml b/addons/hdfs-snapshot-mirroring/pom.xml index d37185f..6d1ef87 100644 --- a/addons/hdfs-snapshot-mirroring/pom.xml +++ b/addons/hdfs-snapshot-mirroring/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <relativePath>../../pom.xml</relativePath> </parent> <artifactId>falcon-hdfs-snapshot-mirroring</artifactId> @@ -85,56 +85,47 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>compile</scope> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-nodemanager</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-distcp</artifactId> - <scope>compile</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java index 2e41cc0..6f5defe 100644 --- a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java @@ -73,8 +73,12 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName()); String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName()); - DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd); - DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd); + // Always add to getConf() so that configuration set by oozie action is + // available when creating DistributedFileSystem. + DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd, + new Configuration(getConf())); + DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd, + new Configuration(getConf())); String currentSnapshotName = HdfsSnapshotUtil.SNAPSHOT_PREFIX + cmd.getOptionValue(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName()) http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java index 22e3377..a50e770 100644 --- a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java @@ -60,8 +60,10 @@ public class HdfsSnapshotEvictor extends Configured implements Tool { @Override public int run(String[] args) throws Exception { CommandLine cmd = getCommand(args); - DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd); - DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd); + DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd, + new Configuration(getConf())); + DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd, + new Configuration(getConf())); String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()); String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java index 5196791..88f6fd9 100644 --- a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java @@ -19,6 +19,7 @@ package org.apache.falcon.snapshots.util; import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; @@ -37,29 +38,33 @@ public final class HdfsSnapshotUtil { private HdfsSnapshotUtil() {} - public static DistributedFileSystem getSourceFileSystem(CommandLine cmd) throws FalconException { + public static DistributedFileSystem getSourceFileSystem(CommandLine cmd, + Configuration conf) throws FalconException { String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName()); String sourceExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName()); String sourcePrincipal = parseKerberosPrincipal(cmd.getOptionValue( HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName())); - Configuration sourceConf = ClusterHelper.getConfiguration(sourceStorageUrl, + + Configuration sourceConf = ClusterHelper.getConfiguration(conf, sourceStorageUrl, sourceExecuteEndpoint, sourcePrincipal); return HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf); } - public static DistributedFileSystem getTargetFileSystem(CommandLine cmd) throws FalconException { + public static DistributedFileSystem getTargetFileSystem(CommandLine cmd, + Configuration conf) throws FalconException { String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName()); String taregtExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName()); String targetPrincipal = parseKerberosPrincipal(cmd.getOptionValue( HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName())); - Configuration targetConf = ClusterHelper.getConfiguration(targetStorageUrl, + Configuration targetConf = ClusterHelper.getConfiguration(conf, targetStorageUrl, taregtExecuteEndpoint, targetPrincipal); return HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf); } public static String parseKerberosPrincipal(String principal) { - if (principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) { + if (StringUtils.isEmpty(principal) + || principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) { return null; } return principal; http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java index 7924214..fe7ced5 100644 --- a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java +++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java @@ -25,7 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; -import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.snapshots.util.HdfsSnapshotUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -72,6 +72,7 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { @BeforeClass public void init() throws Exception { + this.setConf(new Configuration()); baseDir = Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile(); miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, baseDir); miniDfs = miniDFSCluster.getFileSystem(); @@ -100,14 +101,13 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { @Test public void replicationTest() throws Exception { - Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster); - this.setConf(sourceConf); - Configuration targetConf = ClusterHelper.getConfiguration(targetCluster); sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster); targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster); - DistributedFileSystem sourceFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf); - DistributedFileSystem targetFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf); + DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd, + new Configuration(getConf())); + DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd, + new Configuration(getConf())); // create dir1, create snapshot, invoke copy, check file in target, create snapshot on target Path dir1 = new Path(sourceDir, "dir1"); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml index f380012..e2f0c7f 100644 --- a/addons/hivedr/pom.xml +++ b/addons/hivedr/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <relativePath>../../pom.xml</relativePath> </parent> <artifactId>falcon-hive-replication</artifactId> @@ -37,56 +37,47 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>compile</scope> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-nodemanager</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-distcp</artifactId> - <scope>compile</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index 71b9043..d891487 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -30,8 +30,9 @@ public enum HiveDRArgs { SOURCE_CLUSTER("sourceCluster", "source cluster"), SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"), SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"), - SOURCE_DATABASE("sourceDatabase", "comma source databases"), - SOURCE_TABLE("sourceTable", "comma source tables"), + SOURCE_DATABASES("sourceDatabases", "comma source databases"), + SOURCE_DATABASE("sourceDatabase", "First source database"), + SOURCE_TABLES("sourceTables", "comma source tables"), SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data", false), // source hadoop endpoints @@ -70,7 +71,7 @@ public enum HiveDRArgs { // Map Bandwidth DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), - JOB_NAME("jobName", "unique job name"), + JOB_NAME("hiveJobName", "unique job name"), CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"), JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster where job runs"), http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java index 0096727..215be35 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java @@ -63,21 +63,29 @@ public class HiveDROptions { } public List<String> getSourceDatabases() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(",")); + return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASES).trim().split(",")); } public List<String> getSourceTables() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(",")); + return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLES).trim().split(",")); } public String getSourceStagingPath() { + return context.get(HiveDRArgs.SOURCE_STAGING_PATH); + } + + + public void setSourceStagingPath() { String stagingPath = context.get(HiveDRArgs.SOURCE_STAGING_PATH); - if (StringUtils.isNotBlank(stagingPath)) { - stagingPath = StringUtils.removeEnd(stagingPath, File.separator); - return stagingPath + File.separator + getJobName(); + String srcStagingPath; + if ("NA".equalsIgnoreCase(stagingPath)) { + stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator); + srcStagingPath = stagingPath + File.separator + getJobName(); } else { - return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName(); + stagingPath = StringUtils.removeEnd(stagingPath, File.separator); + srcStagingPath = stagingPath + File.separator + getJobName(); } + context.put(HiveDRArgs.SOURCE_STAGING_PATH, srcStagingPath); } public String getSourceWriteEP() { @@ -109,13 +117,20 @@ public class HiveDROptions { } public String getTargetStagingPath() { + return context.get(HiveDRArgs.TARGET_STAGING_PATH); + } + + public void setTargetStagingPath() { String stagingPath = context.get(HiveDRArgs.TARGET_STAGING_PATH); - if (StringUtils.isNotBlank(stagingPath)) { - stagingPath = StringUtils.removeEnd(stagingPath, File.separator); - return stagingPath + File.separator + getJobName(); + String targetStagingPath; + if ("NA".equalsIgnoreCase(stagingPath)) { + stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator); + targetStagingPath = stagingPath + File.separator + getJobName(); } else { - return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName(); + stagingPath = StringUtils.removeEnd(stagingPath, File.separator); + targetStagingPath = stagingPath + File.separator + getJobName(); } + context.put(HiveDRArgs.TARGET_STAGING_PATH, targetStagingPath); } public String getReplicationMaxMaps() { @@ -151,7 +166,7 @@ public class HiveDROptions { } public static HiveDROptions create(String[] args) throws ParseException { - Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>(); + Map<HiveDRArgs, String> options = new HashMap<>(); CommandLine cmd = getCommand(args); for (HiveDRArgs arg : HiveDRArgs.values()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index 17eec22..e45b0d8 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -136,6 +136,13 @@ public class HiveDRTool extends Configured implements Tool { inputOptions = parseOptions(args); LOG.info("Input Options: {}", inputOptions); + // Update the source staging path + inputOptions.setSourceStagingPath(); + inputOptions.setTargetStagingPath(); + + LOG.info("srcStaginPath: {}", inputOptions.getSourceStagingPath()); + LOG.info("tgtStaginPath: {}", inputOptions.getTargetStagingPath()); + Configuration sourceConf = FileUtils.getConfiguration(inputOptions.getSourceWriteEP(), inputOptions.getSourceNNKerberosPrincipal()); sourceClusterFS = FileSystem.get(sourceConf); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java index bae6c9e..a603deb 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java @@ -71,8 +71,8 @@ public class LastReplicatedEvents { FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); if (!jobFS.exists(dir)) { - if (!jobFS.mkdirs(dir)) { - throw new Exception("Creating directory failed: " + dir); + if (!FileSystem.mkdirs(jobFS, dir, FileUtils.DEFAULT_DIR_PERMISSION)) { + throw new IOException("Creating directory failed: " + dir); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java index 08e0551..5cd7e74 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java @@ -28,10 +28,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Map class for Hive DR. @@ -40,6 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class); private EventUtils eventUtils; + ScheduledThreadPoolExecutor timer; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -54,15 +57,22 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, - Context context) throws IOException, InterruptedException { + final Context context) throws IOException, InterruptedException { LOG.debug("Processing Event value: {}", value.toString()); - + timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new Runnable() { + public void run() { + System.out.println("Hive DR copy mapper progress heart beat"); + context.progress(); + } + }, 0, 30, TimeUnit.SECONDS); try { eventUtils.processEvents(value.toString()); } catch (Exception e) { LOG.error("Exception in processing events:", e); throw new IOException(e); } finally { + timer.shutdownNow(); cleanup(context); } List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus(); @@ -75,7 +85,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { // In case of export stage, populate custom counters if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name()) - && !eventUtils.isCountersMapEmtpy()) { + && !eventUtils.isCountersMapEmpty()) { context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment( eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName())); context.getCounter(ReplicationJobCountersList.COPY).increment( http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java index 50cb4b2..f4bb31c 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java @@ -35,12 +35,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Reducer class for Hive DR. */ public class CopyReducer extends Reducer<Text, Text, Text, Text> { private DRStatusStore hiveDRStore; + private ScheduledThreadPoolExecutor timer; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -62,9 +65,18 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> { } @Override - protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + protected void reduce(Text key, Iterable<Text> values, final Context context) + throws IOException, InterruptedException { List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>(); ReplicationStatus rs; + timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new Runnable() { + public void run() { + System.out.println("Hive DR copy reducer progress heart beat"); + context.progress(); + } + }, 0, 30, TimeUnit.SECONDS); + try { for (Text value : values) { String[] fields = (value.toString()).split("\t"); @@ -76,6 +88,8 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> { hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList)); } catch (HiveReplicationException e) { throw new IOException(e); + } finally { + timer.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java index fb695d0..3d3badf 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java @@ -63,8 +63,8 @@ public class EventSourcerUtils { FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); if (!jobFS.exists(dir)) { - if (!jobFS.mkdirs(dir)) { - throw new Exception("Creating directory failed: " + dir); + if (!FileSystem.mkdirs(jobFS, dir, FileUtils.DEFAULT_DIR_PERMISSION)) { + throw new IOException("Creating directory failed: " + dir); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index 3b088f7..590a7e3 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; @@ -95,17 +94,15 @@ public class EventUtils { sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); - sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()) - + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); + sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()); jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); - targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()) - + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); + targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()); targetNN = conf.get(HiveDRArgs.TARGET_NN.getName()); targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); - sourceCleanUpList = new ArrayList<Path>(); - targetCleanUpList = new ArrayList<Path>(); + sourceCleanUpList = new ArrayList<>(); + targetCleanUpList = new ArrayList<>(); countersMap = new HashMap<>(); } @@ -169,7 +166,7 @@ public class EventUtils { } public void processEvents(String event) throws Exception { - listReplicationStatus = new ArrayList<ReplicationStatus>(); + listReplicationStatus = new ArrayList<>(); String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM); String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8"); String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8"); @@ -203,7 +200,7 @@ public class EventUtils { List<Path> cleanUpList, boolean isImportStatements) throws SQLException, HiveReplicationException, IOException { String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM); - List<Command> deserializeCommand = new ArrayList<Command>(); + List<Command> deserializeCommand = new ArrayList<>(); for (String command : commandList) { Command cmd = ReplicationUtils.deserializeCommand(command); deserializeCommand.add(cmd); @@ -269,7 +266,7 @@ public class EventUtils { } private static List<Path> getCleanUpPaths(List<String> cleanupLocations) { - List<Path> cleanupLocationPaths = new ArrayList<Path>(); + List<Path> cleanupLocationPaths = new ArrayList<>(); for (String cleanupLocation : cleanupLocations) { cleanupLocationPaths.add(new Path(cleanupLocation)); } @@ -330,7 +327,7 @@ public class EventUtils { public DistCpOptions getDistCpOptions() { // DistCpOptions expects the first argument to be a file OR a list of Paths - List<Path> sourceUris=new ArrayList<Path>(); + List<Path> sourceUris=new ArrayList<>(); sourceUris.add(new Path(sourceStagingUri)); DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri)); @@ -350,8 +347,8 @@ public class EventUtils { return countersMap.get(counterKey); } - public boolean isCountersMapEmtpy() { - return countersMap.size() == 0 ? true : false; + public boolean isCountersMapEmpty() { + return countersMap.size() == 0; } public void cleanEventsDirectory() throws IOException { http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java index 001d10a..8b5c865 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java @@ -33,9 +33,11 @@ import java.io.IOException; */ public final class FileUtils { - public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH - + File.separator + "Events"; + public static final String DEFAULT_EVENT_STORE_PATH = StringUtils.removeEnd(DRStatusStore + .BASE_DEFAULT_STORE_PATH, File.separator) + File.separator + "Events" + File.separator; public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + public static final FsPermission DEFAULT_DIR_PERMISSION = + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); private FileUtils() {} http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java index 900afe8..44f0989 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -48,7 +49,10 @@ public class HiveDRStatusStore extends DRStatusStore { private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class); private FileSystem fileSystem; - private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + "hiveReplicationStatusStore/"; + private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd + (DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator + + "hiveReplicationStatusStore" + File.separator; + private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); @@ -71,6 +75,8 @@ public class HiveDRStatusStore extends DRStatusStore { Path basePath = new Path(BASE_DEFAULT_STORE_PATH); FileUtils.validatePath(fileSystem, basePath); + // Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs. + // BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore. Path storePath = new Path(DEFAULT_STORE_PATH); if (!fileSystem.exists(storePath)) { if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) { @@ -163,10 +169,11 @@ public class HiveDRStatusStore extends DRStatusStore { private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName, String database) throws HiveReplicationException{ DBReplicationStatus dbReplicationStatus = null; + Path statusDbDirPath = getStatusDbDirPath(database); Path statusDirPath = getStatusDirPath(database, jobName); + // check if database name or jobName can contain chars not allowed by hdfs dir/file naming. // if yes, use md5 of the same for dir names. prefer to use actual db names for readability. - try { if (fileSystem.exists(statusDirPath)) { dbReplicationStatus = readStatusFile(statusDirPath); @@ -176,6 +183,15 @@ public class HiveDRStatusStore extends DRStatusStore { ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName, database, null, ReplicationStatus.Status.INIT, -1); dbReplicationStatus = new DBReplicationStatus(initDbStatus); + + // Create parent dir first with default status store permissions. FALCON-2057 + if (!fileSystem.exists(statusDbDirPath)) { + if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { + String error = "mkdir failed for " + statusDbDirPath.toString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + } if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { String error = "mkdir failed for " + statusDirPath.toString(); LOG.error(error); @@ -197,7 +213,11 @@ public class HiveDRStatusStore extends DRStatusStore { } public Path getStatusDirPath(String database, String jobName) { - return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName); + return new Path(getStatusDbDirPath(database), jobName); + } + + public Path getStatusDbDirPath(String dbName) { + return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase()); } private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException { @@ -271,7 +291,7 @@ public class HiveDRStatusStore extends DRStatusStore { public void checkForReplicationConflict(String newSource, String jobName, String database, String table) throws HiveReplicationException { try { - Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json"); + Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json"); FileStatus[] files = fileSystem.globStatus(globPath); for(FileStatus file : files) { DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString( http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java index d5d3bc5..b21acc7 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java @@ -70,13 +70,16 @@ public final class HiveDRUtils { public static Configuration getDefaultConf() throws IOException { Configuration conf = new Configuration(); - Path confPath = new Path("file:///", System.getProperty("oozie.action.conf.xml")); - final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath); - LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists); - if (actionConfExists) { - LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString()); - conf.addResource(confPath); + if (System.getProperty("oozie.action.conf.xml") != null) { + Path confPath = new Path("file:///", System.getProperty("oozie.action.conf.xml")); + + final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath); + LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists); + if (actionConfExists) { + LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString()); + conf.addResource(confPath); + } } String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java index 1f44b62..a9c5661 100644 --- a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java @@ -25,8 +25,8 @@ public class DRTest { public void testHiveDr(String[] args) { String[] testArgs = { "-sourceMetastoreUri", "thrift://localhost:9083", - "-sourceDatabase", "default", - "-sourceTable", "test", + "-sourceDatabases", "default", + "-sourceTables", "test", "-sourceStagingPath", "/apps/hive/tools/dr", "-sourceNN", "hdfs://localhost:8020", "-sourceRM", "local", http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/archival/pom.xml ---------------------------------------------------------------------- diff --git a/archival/pom.xml b/archival/pom.xml index b117d9d..dcace9d 100644 --- a/archival/pom.xml +++ b/archival/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-archival</artifactId> <description>Apache Falcon Archival Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/build-tools/pom.xml ---------------------------------------------------------------------- diff --git a/build-tools/pom.xml b/build-tools/pom.xml index 6c8801e..8abdef6 100644 --- a/build-tools/pom.xml +++ b/build-tools/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <groupId>org.apache.falcon</groupId> <artifactId>build-tools</artifactId> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/cli/pom.xml ---------------------------------------------------------------------- diff --git a/cli/pom.xml b/cli/pom.xml index e77cb46..8acb905 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-cli</artifactId> <description>Apache Falcon CLI client</description> @@ -39,7 +39,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> - <scope>compile</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 8bc77fa..3eb0a8b 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-client</artifactId> <description>Apache Falcon Java client</description> @@ -117,6 +117,7 @@ <goal>copy-dependencies</goal> </goals> <configuration> + <excludeScope>provided</excludeScope> <outputDirectory>${project.build.directory}/dependency</outputDirectory> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index debb615..846202c 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-common</artifactId> <description>Apache Falcon Common Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 9e16fa4..f89def3 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -75,9 +75,8 @@ public final class ClusterHelper { return conf; } - public static Configuration getConfiguration(String storageUrl, String executeEndPoint, - String kerberosPrincipal) { - Configuration conf = new Configuration(); + public static Configuration getConfiguration(Configuration conf, String storageUrl, + String executeEndPoint, String kerberosPrincipal) { conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl); conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint); conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index ece8b5d..eb15585 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -358,13 +358,20 @@ public class FileSystemStorage extends Configured implements Storage { private FileStatus[] findFilesForFeed(FileSystem fs, String feedBasePath) throws IOException { Matcher matcher = FeedDataPath.PATTERN.matcher(feedBasePath); + boolean regexMatchFound = false; while (matcher.find()) { + regexMatchFound = true; String var = feedBasePath.substring(matcher.start(), matcher.end()); feedBasePath = feedBasePath.replaceAll(Pattern.quote(var), "*"); matcher = FeedDataPath.PATTERN.matcher(feedBasePath); } - LOG.info("Searching for {}", feedBasePath); - return fs.globStatus(new Path(feedBasePath)); + if (regexMatchFound) { + LOG.info("Searching for {}", feedBasePath); + return fs.globStatus(new Path(feedBasePath)); + } else { + LOG.info("Ignoring static path {}", feedBasePath); + return null; + } } private boolean isDateInRange(Date date, Date start) { http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java index f0d6073..c58be64 100644 --- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java +++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java @@ -34,9 +34,6 @@ import java.util.regex.Pattern; public class WorkflowNameBuilder<T extends Entity> { private static final String PREFIX = "FALCON"; - // Oozie JMS message property name that holds the workflow app name - private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName"; - private T entity; private Tag tag; private List<String> suffixes; @@ -156,9 +153,5 @@ public class WorkflowNameBuilder<T extends Entity> { } return null; } - - public static String getJMSFalconSelector() { - return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR); - } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index 96ba748..3f35962 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -45,8 +45,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,14 +234,13 @@ public class ClusterEntityParser extends EntityParser<Cluster> { } protected void validateSparkMasterInterface(Cluster cluster) throws ValidationException { - final String sparkMasterUrl = ClusterHelper.getSparkMasterEndPoint(cluster); - if (StringUtils.isNotEmpty(sparkMasterUrl)) { - SparkConf sparkConf = new SparkConf(); - sparkConf.setMaster(sparkMasterUrl).setAppName("Falcon Spark"); - - JavaSparkContext sc = new JavaSparkContext(sparkConf); - if (sc.startTime() == null) { - throw new ValidationException("Unable to reach Spark master URL:" + sparkMasterUrl); + final String sparkMasterEndPoint = ClusterHelper.getSparkMasterEndPoint(cluster); + LOG.info("Validating spark interface: {}", sparkMasterEndPoint); + if (StringUtils.isNotEmpty(sparkMasterEndPoint)) { + if (!("yarn-cluster".equalsIgnoreCase(sparkMasterEndPoint) + || "yarn-client".equalsIgnoreCase(sparkMasterEndPoint) + || "local".equalsIgnoreCase(sparkMasterEndPoint))) { + throw new ValidationException("Invalid Spark Interface End Point:" + sparkMasterEndPoint); } } }
