Repository: falcon Updated Branches: refs/heads/master da767c2f6 -> b0efc6fbf
FALCON-2072 Hive2 URLs in Falcon should allow additional configuration elements in the URL Author: Sowmya Ramesh <[email protected]> Reviewers: "Venkat Ranganathan <[email protected]>, Balu Vellanki <[email protected]>" Closes #250 from sowmyaramesh/FALCON-2072 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b0efc6fb Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b0efc6fb Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b0efc6fb Branch: refs/heads/master Commit: b0efc6fbf97c109a81b490339d2521c21fffe37e Parents: da767c2 Author: Sowmya Ramesh <[email protected]> Authored: Wed Aug 3 15:06:19 2016 -0700 Committer: bvellanki <[email protected]> Committed: Wed Aug 3 15:06:19 2016 -0700 ---------------------------------------------------------------------- .../main/META/hive-mirroring-properties.json | 16 ++++- .../runtime/hive-mirroring-secure-workflow.xml | 12 ++++ .../runtime/hive-mirroring-workflow.xml | 12 ++++ .../java/org/apache/falcon/hive/HiveDRArgs.java | 2 + .../org/apache/falcon/hive/util/EventUtils.java | 49 ++++++++++++-- .../falcon/hive/util/HiveDRStatusStore.java | 24 +++---- .../apache/falcon/hive/util/EventUtilsTest.java | 71 ++++++++++++++++++++ .../mirroring/hive/HiveMirroringExtension.java | 18 ++--- .../hive/HiveMirroringExtensionProperties.java | 2 + 9 files changed, 178 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json b/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json index e019e68..686ce94 100644 --- a/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json +++ b/addons/extensions/hive-mirroring/src/main/META/hive-mirroring-properties.json @@ -94,10 +94,16 @@ { "propertyName":"sourceHiveServer2Uri", "required":true, - "description":"Hive2 server end point", + "description":"Hive2 server end point. If Zookeeper discovery mode is enabled zookeeper_ensemble is expected", "example":"hive2://localhost:10000" }, { + "propertyName":"sourceHiveServer2ExtraOpts", + "required":false, + "description":"Extra opts required when SSL is enbaled, Http mode and when zookeeper discovery is used", + "example":"serviceDiscoveryMode=zooKeeper; zooKeeperNamespace=<hiveserver2_namespace>" + }, + { "propertyName":"sourceDatabases", "required":true, "description":"For DB level replication specify multiple comma separated databases to replicate", @@ -130,10 +136,16 @@ { "propertyName":"targetHiveServer2Uri", "required":true, - "description":"Hive2 server end point", + "description":"Hive2 server end point. If Zookeeper discovery mode is enabled zookeeper_ensemble is expected", "example":"hive2://localhost:10000" }, { + "propertyName":"targetHiveServer2ExtraOpts", + "required":false, + "description":"Extra opts required when SSL is enbaled, Http mode and when zookeeper discovery is used", + "example":"serviceDiscoveryMode=zooKeeper; zooKeeperNamespace=<hiveserver2_namespace>" + }, + { "propertyName":"targetStagingPath", "required":false, "description":"Staging path on target", http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml index 63e9a67..6ccea3a 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml @@ -102,6 +102,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -122,6 +124,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> @@ -200,6 +204,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -220,6 +226,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> @@ -302,6 +310,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -322,6 +332,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml index 4f6eec5..5336bda 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml @@ -52,6 +52,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -66,6 +68,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> @@ -128,6 +132,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -142,6 +148,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> @@ -208,6 +216,8 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceHiveServer2ExtraOpts</arg> + <arg>${sourceHiveServer2ExtraOpts}</arg> <arg>-sourceDatabases</arg> <arg>${sourceDatabases}</arg> <arg>-sourceTables</arg> @@ -222,6 +232,8 @@ <arg>${targetMetastoreUri}</arg> <arg>-targetHiveServer2Uri</arg> <arg>${targetHiveServer2Uri}</arg> + <arg>-targetHiveServer2ExtraOpts</arg> + <arg>${targetHiveServer2ExtraOpts}</arg> <arg>-targetStagingPath</arg> <arg>${targetStagingPath}</arg> <arg>-targetNN</arg> http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index d891487..9decd30 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -30,6 +30,7 @@ public enum HiveDRArgs { SOURCE_CLUSTER("sourceCluster", "source cluster"), SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"), SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"), + SOURCE_HS2_URI_EXTRA_OPTS("sourceHiveServer2ExtraOpts", "source HS2 extra opts", false), SOURCE_DATABASES("sourceDatabases", "comma source databases"), SOURCE_DATABASE("sourceDatabase", "First source database"), SOURCE_TABLES("sourceTables", "comma source tables"), @@ -47,6 +48,7 @@ public enum HiveDRArgs { // target meta store details TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"), TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"), + TARGET_HS2_URI_EXTRA_OPTS("targetHiveServer2ExtraOpts", "target HS2 extra opts", false), TARGET_STAGING_PATH("targetStagingPath", "source staging path for data", false), http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index 05b5f96..492c70e 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -60,12 +60,14 @@ public class EventUtils { private Configuration conf = null; private String sourceHiveServer2Uri = null; + private String sourceHS2UriExtraOptions = null; private String sourceDatabase = null; private String sourceNN = null; private String sourceNNKerberosPrincipal = null; private String jobNN = null; private String jobNNKerberosPrincipal = null; private String targetHiveServer2Uri = null; + private String targetHS2UriExtraOptions = null; private String sourceStagingPath = null; private String targetStagingPath = null; private String targetNN = null; @@ -91,6 +93,7 @@ public class EventUtils { public EventUtils(Configuration conf) { this.conf = conf; sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName()); + sourceHS2UriExtraOptions = conf.get(HiveDRArgs.SOURCE_HS2_URI_EXTRA_OPTS.getName()); sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); @@ -98,6 +101,7 @@ public class EventUtils { jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); + targetHS2UriExtraOptions = conf.get(HiveDRArgs.TARGET_HS2_URI_EXTRA_OPTS.getName()); targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()); targetNN = conf.get(HiveDRArgs.TARGET_NN.getName()); targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); @@ -122,22 +126,55 @@ public class EventUtils { if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { - String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase; + String authString = null; if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) { - connString += authTokenString; + authString = authTokenString; } + + String connString = getSourceHS2ConnectionUrl(authString); sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); sourceStatement = sourceConnection.createStatement(); } else { - String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase; + String authString = null; if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) { - connString += authTokenString; + authString = authTokenString; } + String connString = getTargetHS2ConnectionUrl(authString); targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); targetStatement = targetConnection.createStatement(); } } + private String getSourceHS2ConnectionUrl(final String authTokenString) { + return getHS2ConnectionUrl(sourceHiveServer2Uri, sourceDatabase, + authTokenString, sourceHS2UriExtraOptions); + } + + private String getTargetHS2ConnectionUrl(final String authTokenString) { + return getHS2ConnectionUrl(targetHiveServer2Uri, sourceDatabase, + authTokenString, targetHS2UriExtraOptions); + } + + public static String getHS2ConnectionUrl(final String hs2Uri, final String database, + final String authTokenString, final String hs2UriExtraOpts) { + StringBuilder connString = new StringBuilder(); + connString.append(JDBC_PREFIX).append(StringUtils.removeEnd(hs2Uri, "/")).append("/").append(database); + + if (StringUtils.isNotBlank(authTokenString)) { + connString.append(authTokenString); + } + + if (StringUtils.isNotBlank(hs2UriExtraOpts) && !("NA".equalsIgnoreCase(hs2UriExtraOpts))) { + if (!hs2UriExtraOpts.startsWith(";")) { + connString.append(";"); + } + connString.append(hs2UriExtraOpts); + } + + LOG.info("getHS2ConnectionUrl connection uri: {}", connString); + return connString.toString(); + } + public void initializeFS() throws IOException { LOG.info("Initializing staging directory"); sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString(); @@ -152,7 +189,7 @@ public class EventUtils { BufferedReader in = new BufferedReader(new InputStreamReader(jobFileSystem.open(eventFileName))); try { String line; - while ((line=in.readLine())!=null) { + while ((line = in.readLine()) != null) { eventString.append(line); eventString.append(DelimiterUtils.NEWLINE_DELIM); } @@ -327,7 +364,7 @@ public class EventUtils { public DistCpOptions getDistCpOptions() { // DistCpOptions expects the first argument to be a file OR a list of Paths - List<Path> sourceUris=new ArrayList<>(); + List<Path> sourceUris = new ArrayList<>(); sourceUris.add(new Path(sourceStagingUri)); DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri)); http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java index 44f0989..ee459a3 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java @@ -49,8 +49,8 @@ public class HiveDRStatusStore extends DRStatusStore { private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class); private FileSystem fileSystem; - private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd - (DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator + private static final String DEFAULT_STORE_PATH = StringUtils.removeEnd( + DRStatusStore.BASE_DEFAULT_STORE_PATH, File.separator) + File.separator + "hiveReplicationStatusStore" + File.separator; private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION = @@ -90,10 +90,10 @@ public class HiveDRStatusStore extends DRStatusStore { } } - /** - get all DB updated by the job. get all current table statuses for the DB merge the latest repl - status with prev table repl statuses. If all statuses are success, store the status as success - with largest eventId for the DB else store status as failure for the DB and lowest eventId. + /** + * get all DB updated by the job. get all current table statuses for the DB merge the latest repl + * status with prev table repl statuses. If all statuses are success, store the status as success + * with largest eventId for the DB else store status as failure for the DB and lowest eventId. */ @Override public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList) @@ -161,13 +161,13 @@ public class HiveDRStatusStore extends DRStatusStore { } } catch (IOException e) { throw new HiveReplicationException("Failed to delete status for Job " - + jobName + " and DB "+ database, e); + + jobName + " and DB " + database, e); } } private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName, - String database) throws HiveReplicationException{ + String database) throws HiveReplicationException { DBReplicationStatus dbReplicationStatus = null; Path statusDbDirPath = getStatusDbDirPath(database); Path statusDirPath = getStatusDirPath(database, jobName); @@ -253,7 +253,7 @@ public class HiveDRStatusStore extends DRStatusStore { while (fileIterator.hasNext()) { fileList.add(fileIterator.next().getPath().toString()); } - if (fileList.size() > (numFiles+1)) { + if (fileList.size() > (numFiles + 1)) { // delete some files, as long as they are older than the time. Collections.sort(fileList); for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) { @@ -289,11 +289,11 @@ public class HiveDRStatusStore extends DRStatusStore { } public void checkForReplicationConflict(String newSource, String jobName, - String database, String table) throws HiveReplicationException { + String database, String table) throws HiveReplicationException { try { Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json"); FileStatus[] files = fileSystem.globStatus(globPath); - for(FileStatus file : files) { + for (FileStatus file : files) { DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString( fileSystem.open(file.getPath()))); ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus(); @@ -319,7 +319,7 @@ public class HiveDRStatusStore extends DRStatusStore { allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and job2 can replicate db1.table2. Both jobs cannot replicate to same table. */ - for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) { + for (Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) { if (table.equals(entry.getKey())) { throw new HiveReplicationException("Two different jobs are trying to replicate to same table " + entry.getKey() + ". New job = " + jobName http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java new file mode 100644 index 0000000..2e78519 --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/util/EventUtilsTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Unit tests for EventUtils. + */ +@Test +public class EventUtilsTest { + private static final String JDBC_PREFIX = "jdbc:"; + private static final String HS2_URI = "hive2://localhost:10000:"; + private static final String HS2_ZK_URI = "hive2://host1.com:2181,host2.com:2181/"; + private static final String DATABASE = "test"; + private static final String HS2_SSL_EXTRA_OPTS = "ssl=true;" + + "sslTrustStore=/var/lib/security/keystores/gateway.jks;" + + "trustStorePassword=1234?hive.server2.transport.mode=http;hive.server2.thrift.http" + + ".path=gateway/primaryCLuster/hive"; + private static final String HS2_ZK_EXTRA_OPTS = ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"; + private static final String AUTHTOKEN_STRING = ";auth=delegationToken"; + public EventUtilsTest() { + } + + @Test + public void validateHs2Uri() { + final String expectedUri = JDBC_PREFIX + HS2_URI + "/" + DATABASE; + + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, null), expectedUri); + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, "NA"), expectedUri); + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, AUTHTOKEN_STRING, + null), expectedUri + AUTHTOKEN_STRING); + } + + @Test + public void validateHs2UriWhenSSLEnabled() { + final String expectedUri = JDBC_PREFIX + HS2_URI + "/" + DATABASE; + + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, null, HS2_SSL_EXTRA_OPTS), + expectedUri + ";" + HS2_SSL_EXTRA_OPTS); + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_URI, DATABASE, AUTHTOKEN_STRING, HS2_SSL_EXTRA_OPTS), + expectedUri + AUTHTOKEN_STRING + ";" + HS2_SSL_EXTRA_OPTS); + } + + @Test + public void validateHs2UriWhenZKDiscoveryEnabled() { + final String expectedUri = JDBC_PREFIX + HS2_ZK_URI + DATABASE; + + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_ZK_URI, DATABASE, null, HS2_ZK_EXTRA_OPTS), + expectedUri + HS2_ZK_EXTRA_OPTS); + Assert.assertEquals(EventUtils.getHS2ConnectionUrl(HS2_ZK_URI, DATABASE, AUTHTOKEN_STRING, HS2_ZK_EXTRA_OPTS), + expectedUri + AUTHTOKEN_STRING + HS2_ZK_EXTRA_OPTS); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java index c3bd7a7..6c7e5da 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java @@ -115,14 +115,13 @@ public class HiveMirroringExtension extends AbstractExtension { } @Override - public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException { + public Properties getAdditionalProperties(final Properties extensionProperties) + throws FalconException { Properties additionalProperties = new Properties(); - String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); // Add job name as Hive DR job additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(), jobName); - // Get the first source DB additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(), extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES @@ -235,28 +234,31 @@ public class HiveMirroringExtension extends AbstractExtension { if (StringUtils.isBlank(distcpMaxMaps)) { additionalProperties.put(HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(), "1"); } - String distcpMapBandwidth = extensionProperties.getProperty( HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName()); if (StringUtils.isBlank(distcpMapBandwidth)) { additionalProperties.put(HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(), "100"); } - if (StringUtils.isBlank( extensionProperties.getProperty(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName()))) { additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); } - if (StringUtils.isBlank( extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName()))) { additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(), NOT_APPLICABLE); } - if (StringUtils.isBlank( extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName()))) { additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(), NOT_APPLICABLE); } - + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_HS2_EXTRA_OPTS.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_HS2_EXTRA_OPTS.getName(), NOT_APPLICABLE); + } + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_HS2_EXTRA_OPTS.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_HS2_EXTRA_OPTS.getName(), NOT_APPLICABLE); + } return additionalProperties; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b0efc6fb/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java index 828817b..2276d1c 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java @@ -26,6 +26,7 @@ public enum HiveMirroringExtensionProperties { SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"), SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false), SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"), + SOURCE_HS2_EXTRA_OPTS("sourceHiveServer2ExtraOpts", "Source HS2 extra opts", false), SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"), SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false), SOURCE_TABLES("sourceTables", "List of tables to replicate", false), @@ -40,6 +41,7 @@ public enum HiveMirroringExtensionProperties { TARGET_CLUSTER("targetCluster", "Target cluster name"), TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri", false), TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"), + TARGET_HS2_EXTRA_OPTS("targetHiveServer2ExtraOpts", "Target HS2 extra opts", false), TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path", false), TARGET_NN("targetNN", "Target name node", false), TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false),
