OOZIE-2701 Oozie to support Multiple HCatalog URIs (abhishekbafna)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/89be33bb Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/89be33bb Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/89be33bb Branch: refs/heads/oya Commit: 89be33bb134285faa7d22d756ef34f56ef490e5f Parents: ce7eb31 Author: abhisek bafna <[email protected]> Authored: Thu Apr 6 19:40:55 2017 +0530 Committer: abhisek bafna <[email protected]> Committed: Thu Apr 6 19:40:55 2017 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/coord/HCatELFunctions.java | 15 ++-- .../apache/oozie/dependency/HCatURIHandler.java | 12 ++- core/src/main/resources/oozie-default.xml | 6 ++ .../apache/oozie/util/TestHCatURIParser.java | 87 ++++++++++++++++++++ .../src/site/twiki/DG_HCatalogIntegration.twiki | 33 ++++++++ release-log.txt | 1 + .../java/org/apache/oozie/util/HCatURI.java | 21 +++++ .../org/apache/oozie/util/HCatURIParser.java | 48 +++++++++++ .../java/org/apache/oozie/util/TestHCatURI.java | 21 +++-- 9 files changed, 227 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java index 9475f72..f40f406 100644 --- a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java +++ b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java @@ -20,15 +20,18 @@ package org.apache.oozie.coord; import java.net.URI; import java.net.URISyntaxException; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.DagELFunctions; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.dependency.URIHandler; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.Services; import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.HCatURI; +import org.apache.oozie.util.HCatURIParser; import org.apache.oozie.util.XLog; /** @@ -37,6 +40,8 @@ import org.apache.oozie.util.XLog; public class HCatELFunctions { private static final Configuration EMPTY_CONF = new Configuration(true); + private static final String HCAT_URI_REGEX_CONFIG = ConfigurationService.get("oozie.hcat.uri.regex.pattern"); + private static final Pattern HCAT_URI_PATTERN = Pattern.compile(HCAT_URI_REGEX_CONFIG); enum EventType { input, output @@ -291,7 +296,7 @@ public class HCatELFunctions { String partitionValue = null; if (uri != null) { if (type.equals("hive-export")) { - String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR); + String[] uriList = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN); if (uriList.length > 1) { throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: " + dataInName + " URI: " + uri); @@ -331,7 +336,7 @@ public class HCatELFunctions { } String minPartition = null; if (uris != null) { - String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); + String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN); // get the partition values list and find minimum try { // initialize minValue with first partition value @@ -376,7 +381,7 @@ public class HCatELFunctions { } String maxPartition = null; if (uris != null) { - String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); + String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN); // get the partition values list and find minimum try { // initialize minValue with first partition value @@ -403,7 +408,7 @@ public class HCatELFunctions { } private static String createPartitionFilter(String uris, String type) { - String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR); + String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN); StringBuilder filter = new StringBuilder(""); if (uriList.length > 0) { for (String uri : uriList) { @@ -433,7 +438,7 @@ public class HCatELFunctions { uris = (String) eval.getVariable(".dataout." + dataInName); } if (uris != null) { - String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1); + String[] uri = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN); uriTemplate.append(uri[0]); } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java index 4cc284a..c60c811 100644 --- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java @@ -241,7 +241,7 @@ public class HCatURIHandler implements URIHandler { } - private HiveConf getHiveConf(URI uri, Configuration conf){ + private HiveConf getHiveConf(URI uri, Configuration conf) throws HCatAccessorException { HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); if (hcatService.getHCatConf() != null) { conf = hcatService.getHCatConf(); @@ -319,7 +319,7 @@ public class HCatURIHandler implements URIHandler { } } - private String getMetastoreConnectURI(URI uri) { + private String getMetastoreConnectURI(URI uri) throws HCatAccessorException { String metastoreURI; // For unit tests if (uri.getAuthority().equals("unittest-local")) { @@ -328,7 +328,13 @@ public class HCatURIHandler implements URIHandler { else { // Hardcoding hcat to thrift mapping till support for webhcat(templeton) // is added - metastoreURI = "thrift://" + uri.getAuthority(); + HCatURI hCatURI; + try { + hCatURI = new HCatURI(uri.toString()); + metastoreURI = hCatURI.getServerEndPointWithScheme("thrift"); + } catch (URISyntaxException e) { + throw new HCatAccessorException(ErrorCode.E0902, e); + } } return metastoreURI; } http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index fe095ca..ae384b4 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -3009,4 +3009,10 @@ will be the requeue interval for the actions which are waiting for a long time w </description> </property> + <property> + <name>oozie.hcat.uri.regex.pattern</name> + <value>([a-z]+://[\w\.\-]+:\d+[,]*)+/\w+/\w+/?[\w+=;\-]*</value> + <description>Regex pattern for HCat URIs. The regex can be modified by users as per requirement + for parsing/splitting the HCat URIs.</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java b/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java new file mode 100644 index 0000000..54cda3a --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XTestCase; + +import java.net.URI; +import java.util.regex.Pattern; + +public class TestHCatURIParser extends XTestCase { + + private Services services; + private Pattern HCAT_URI_PATTERN; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + String HCAT_URI_REGEX_CONFIG = ConfigurationService.get("oozie.hcat.uri.regex.pattern"); + HCAT_URI_PATTERN = Pattern.compile(HCAT_URI_REGEX_CONFIG); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + services.destroy(); + } + + public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly1() { + String uri = "hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us,scheme://hostname3:3000," + + "scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us,scheme://hostname4:4000/d/t/p=1"; + String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN); + assertEquals(3, uris.length); + assertEquals("hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us", uris[0]); + assertEquals("scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us", uris[1]); + assertEquals("scheme://hostname4:4000/d/t/p=1", uris[2]); + } + + public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly2() { + String uri = "thrift://host.name1:1000/mydb/clicks/datastamp=12;region=u_s"; + String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN); + assertEquals(1, uris.length); + assertEquals("thrift://host.name1:1000/mydb/clicks/datastamp=12;region=u_s", uris[0]); + } + + public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly3() { + String uri = "hcat://10.10.10.10:9083/default/invites/ds=2010-01-01;region=usa"; + String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN); + assertEquals(1, uris.length); + assertEquals("hcat://10.10.10.10:9083/default/invites/ds=2010-01-01;region=usa", uris[0]); + } + + public void testParsingMultipleHCatServerURI() throws Exception { + String uriStr = "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080/mydb/clicks/datastamp=12;region=us"; + String[] uris = HCatURIParser.splitHCatUris(uriStr, HCAT_URI_PATTERN); + URI uri = HCatURIParser.parseURI(new URI(uris[0])); + assertEquals("hcat", uri.getScheme()); + assertEquals("hcat.server.com:5080,hcat.server1.com:5080", uri.getAuthority()); + } + + public void testParsingSingleServerURI() throws Exception { + String uriStr = "hdfs://namenode.example.com:8020/path/to/directory/file"; + URI uri = HCatURIParser.parseURI(new URI(uriStr)); + assertEquals("hdfs", uri.getScheme()); + assertEquals("namenode.example.com:8020", uri.getAuthority()); + assertEquals("/path/to/directory/file", uri.getPath()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/docs/src/site/twiki/DG_HCatalogIntegration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_HCatalogIntegration.twiki b/docs/src/site/twiki/DG_HCatalogIntegration.twiki index 39df67c..d3107b4 100644 --- a/docs/src/site/twiki/DG_HCatalogIntegration.twiki +++ b/docs/src/site/twiki/DG_HCatalogIntegration.twiki @@ -59,9 +59,11 @@ Oozie supports specifying HCatalog partitions as a data dependency through a URI used to identify a set of table partitions: hcat://bar:8020/logsDB/logsTable/dt=20090415;region=US. The format to specify a HCatalog table URI is: + hcat://[metastore server]:[port]/[database name]/[table name] The format to specify a HCatalog table partition URI is: + hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];... For example, @@ -74,6 +76,37 @@ For example, </dataset> </verbatim> +Post Oozie-4.3.0 release, Oozie also supports the multiple HCatalog servers in the URI. Each of the server needs to be +separated by single comma (,). + +The format to specify a HCatalog table partition URI with multiple HCatalog server is: + +hcat://[metastore_server]:[port],[metastore_server]:[port]/[database_name]/[table_name]/[partkey1]=[value];[partkey2]=[value];... + +For example, +<verbatim> + <dataset name="logs" frequency="${coord:days(1)}" + initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles"> + <uri-template> + hcat://myhcatmetastore:9080,myhcatmetastore:9080/database1/table1/datestamp=${YEAR}${MONTH}${DAY}${HOUR};region=USA + </uri-template> + </dataset> +</verbatim> + +The regex for parsing the multiple HCatalog URI is exposed via oozie-site.xml, So Users can modify if there is any +requirement. Key for the regex is: =oozie.hcat.uri.regex.pattern= + +For example, following has multiple HCatalog URI with multiple HCatalog servers. To understand this, Oozie will split them into +two HCatalog URIs. For splitting the URIs, above mentioned regex is used. + +hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us,scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us + +After split: (This is internal Oozie mechanism) + +hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us + +scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us + #HCatalogLibraries ---+++ HCatalog Libraries http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4ccc9e5..845912c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2701 Oozie to support Multiple HCatalog URIs (abhishekbafna) OOZIE-2850 Fix default callback notifications (asasvari via gezapeti) OOZIE-1283 Remove the old ssh documentation (Jan Hentschel via rkanter) OOZIE-2845 Replace reflection-based code which sets variable in HiveConf (pbacsko via abhishekbafna) http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java ---------------------------------------------------------------------- diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java index faeff2a..88ad762 100644 --- a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java +++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java @@ -55,6 +55,7 @@ public class HCatURI { private void parse(URI uri) throws URISyntaxException { + uri = HCatURIParser.parseURI(uri); this.uri = uri; if (uri.getAuthority() == null) { @@ -107,6 +108,26 @@ public class HCatURI { } /** + * Return server end points with given scheme + * @param scheme uri scheme + * @return server end point with given scheme + */ + public String getServerEndPointWithScheme(String scheme) { + String authority = uri.getAuthority(); + String[] authorities = authority.split(","); + StringBuilder builder = new StringBuilder(); + for (String auth : authorities) { + if (builder.length() != 0) { + builder.append(","); + } + builder.append(scheme); + builder.append("://"); + builder.append(auth); + } + return builder.toString(); + } + + /** * @return fully qualified server address */ public String getServerEndPoint() { http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java ---------------------------------------------------------------------- diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java new file mode 100644 index 0000000..94fc93a --- /dev/null +++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class HCatURIParser { + + public static String[] splitHCatUris(String uri, Pattern pattern) { + List<String> list = new ArrayList<>(); + Matcher matcher = pattern.matcher(uri); + while (matcher.find()) { + String s = matcher.group(); + list.add(s); + } + return list.toArray(new String[list.size()]); + } + + static URI parseURI(URI uri) throws URISyntaxException { + String uriStr = uri.toString(); + int index = uriStr.indexOf("://"); + String scheme = uriStr.substring(0, index + 3); + uriStr = uriStr.replaceAll(scheme, ""); + uriStr = scheme.concat(uriStr); + return new URI(uriStr); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java ---------------------------------------------------------------------- diff --git a/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java b/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java index ce690da..eceda81 100644 --- a/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java +++ b/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java @@ -24,26 +24,29 @@ import java.util.LinkedHashMap; import java.util.Map; import org.junit.Test; -import org.apache.oozie.util.HCatURI; public class TestHCatURI { @Test - public void testHCatURIParseValidURI() { + public void testHCatURIParseValidURI() throws URISyntaxException { String input = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us"; - HCatURI uri = null; - try { - uri = new HCatURI(input); - } - catch (Exception ex) { - System.err.print(ex.getMessage()); - } + HCatURI uri = new HCatURI(input); assertEquals(uri.getServerEndPoint(), "hcat://hcat.server.com:5080"); assertEquals(uri.getDb(), "mydb"); assertEquals(uri.getTable(), "clicks"); assertEquals(uri.getPartitionValue("datastamp"), "12"); assertEquals(uri.getPartitionValue("region"), "us"); } + @Test + public void whenMultipleHCatURIsAreParsedASingleURIIsExtracted() throws URISyntaxException { + String input = "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080/mydb/clicks/datastamp=12;region=us"; + HCatURI uri = new HCatURI(input); + assertEquals(uri.getServerEndPointWithScheme("hcat"), "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080"); + assertEquals(uri.getDb(), "mydb"); + assertEquals(uri.getTable(), "clicks"); + assertEquals(uri.getPartitionValue("datastamp"), "12"); + assertEquals(uri.getPartitionValue("region"), "us"); + } @Test public void testHCatTableURI() {
