Repository: incubator-gobblin Updated Branches: refs/heads/master e1716b527 -> ce78e619a
[GOBBLIN-609] Change config store to append to path part of URI Closes #2475 from jack-moseley/config-store-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ce78e619 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ce78e619 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ce78e619 Branch: refs/heads/master Commit: ce78e619a36eb8a2268ee4294969f4b258596aa9 Parents: e1716b5 Author: Jack Moseley <[email protected]> Authored: Wed Oct 10 10:59:56 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 10 10:59:56 2018 -0700 ---------------------------------------------------------------------- .../config/store/zip/IvyConfigStoreFactory.java | 25 ++++++++++++++------ .../extract/kafka/ConfigStoreUtils.java | 9 +++---- 2 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce78e619/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java index dba7d3a..4b7286a 100644 --- a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java +++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java @@ -19,6 +19,7 @@ package org.apache.gobblin.config.store.zip; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.nio.file.Paths; @@ -50,6 +51,7 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt private static final String IVY_SCHEME_PREFIX = "ivy-"; private static final String ORG_KEY = "org"; private static final String MODULE_KEY = "module"; + private static final String STORE_PATH_KEY = "storePath"; private static final String STORE_PREFIX_KEY = "storePrefix"; @Override @@ -64,11 +66,12 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt /** * Example configKey URI (configuration is passed as part of the query) * - * ivy-hdfs://<hdfsURI>/path/to/config/store?org=<jarOrg>&module=<jarModule>&storePrefix=_CONFIG_STORE + * ivy-hdfs:/<relativePath>?org=<jarOrg>&module=<jarModule>&storePath=/path/to/hdfs/store&storePrefix=_CONFIG_STORE * * ivy-hdfs: scheme for this factory - * hdfsURI/path/to/config/store: location of HDFS config store (used for getting current version) + * relativePath: config key path within the jar * org/module: org and module of jar containing config store + * storePath: location of HDFS config store (used for getting current version) * storePrefix: prefix to paths in config store */ @Override @@ -84,14 +87,15 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt String jarOrg = factoryProps.getProperty(ORG_KEY); String jarModule = factoryProps.getProperty(MODULE_KEY); + String storePath = factoryProps.getProperty(STORE_PATH_KEY); - if (jarOrg == null || jarModule == null) { - throw new ConfigStoreCreationException(configKey, "Config key URI must contain org and module to download from"); + if (jarOrg == null || jarModule == null || storePath == null) { + throw new ConfigStoreCreationException(configKey, "Config key URI must contain org, module, and storePath"); } try { SimpleHDFSStoreMetadata metadata = new SimpleHDFSStoreMetadata( - org.apache.hadoop.fs.FileSystem.get(new Configuration()), new Path(configKey.getPath(), + org.apache.hadoop.fs.FileSystem.get(new Configuration()), new Path(storePath, SimpleHadoopFilesystemConfigStore.CONFIG_STORE_NAME)); String currentVersion = metadata.getCurrentVersion(); @@ -107,10 +111,17 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt throw new ConfigStoreCreationException(configKey, "Downloaded file must be a zip or jar file"); } - return new ZipFileConfigStore((ZipFileSystem) zipFs, configKey, currentVersion, factoryProps.getProperty(STORE_PREFIX_KEY, "")); - } catch (IOException e) { + return new ZipFileConfigStore((ZipFileSystem) zipFs, getBaseURI(configKey), currentVersion, factoryProps.getProperty(STORE_PREFIX_KEY, "")); + } catch (IOException | URISyntaxException e) { throw new ConfigStoreCreationException(configKey, e); } } + + /** + * Base URI for a config store should be root of the zip file, so change path part of URI to be null + */ + private URI getBaseURI(URI configKey) throws URISyntaxException { + return new URI(configKey.getScheme(), configKey.getAuthority(), null, configKey.getQuery(), configKey.getFragment()); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce78e619/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java index 4eea468..a05438c 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java @@ -81,10 +81,11 @@ public class ConfigStoreUtils { public static URI getUriStringForTopic(String topicName, String commonPath, String configStoreUri) throws URISyntaxException { - Path path = - PathUtils.mergePaths(new Path(configStoreUri), PathUtils.mergePaths(new Path(commonPath), new Path(topicName))); - log.info("URI for topic is : " + path.toString()); - return new URI(path.toString()); + URI storeUri = new URI(configStoreUri); + Path path = PathUtils.mergePaths(new Path(storeUri.getPath()), PathUtils.mergePaths(new Path(commonPath), new Path(topicName))); + URI topicUri = new URI(storeUri.getScheme(), storeUri.getAuthority(), path.toString(), storeUri.getQuery(), storeUri.getFragment()); + log.info("URI for topic is : " + topicUri.toString()); + return topicUri; } public static Optional<Config> getConfigForTopic(Properties properties, String topicKey, ConfigClient configClient) {
