Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e1716b527 -> ce78e619a


[GOBBLIN-609] Change config store to append to path part of URI

Closes #2475 from jack-moseley/config-store-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ce78e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ce78e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ce78e619

Branch: refs/heads/master
Commit: ce78e619a36eb8a2268ee4294969f4b258596aa9
Parents: e1716b5
Author: Jack Moseley <[email protected]>
Authored: Wed Oct 10 10:59:56 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed Oct 10 10:59:56 2018 -0700

----------------------------------------------------------------------
 .../config/store/zip/IvyConfigStoreFactory.java | 25 ++++++++++++++------
 .../extract/kafka/ConfigStoreUtils.java         |  9 +++----
 2 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce78e619/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
 
b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
index dba7d3a..4b7286a 100644
--- 
a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
+++ 
b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.config.store.zip;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.FileSystems;
 import java.nio.file.Paths;
@@ -50,6 +51,7 @@ public class IvyConfigStoreFactory implements 
ConfigStoreFactory<ZipFileConfigSt
   private static final String IVY_SCHEME_PREFIX = "ivy-";
   private static final String ORG_KEY = "org";
   private static final String MODULE_KEY = "module";
+  private static final String STORE_PATH_KEY = "storePath";
   private static final String STORE_PREFIX_KEY = "storePrefix";
 
   @Override
@@ -64,11 +66,12 @@ public class IvyConfigStoreFactory implements 
ConfigStoreFactory<ZipFileConfigSt
   /**
    * Example configKey URI (configuration is passed as part of the query)
    *
-   * 
ivy-hdfs://<hdfsURI>/path/to/config/store?org=<jarOrg>&module=<jarModule>&storePrefix=_CONFIG_STORE
+   * 
ivy-hdfs:/<relativePath>?org=<jarOrg>&module=<jarModule>&storePath=/path/to/hdfs/store&storePrefix=_CONFIG_STORE
    *
    * ivy-hdfs: scheme for this factory
-   * hdfsURI/path/to/config/store: location of HDFS config store (used for 
getting current version)
+   * relativePath: config key path within the jar
    * org/module: org and module of jar containing config store
+   * storePath: location of HDFS config store (used for getting current 
version)
    * storePrefix: prefix to paths in config store
    */
   @Override
@@ -84,14 +87,15 @@ public class IvyConfigStoreFactory implements 
ConfigStoreFactory<ZipFileConfigSt
 
     String jarOrg = factoryProps.getProperty(ORG_KEY);
     String jarModule = factoryProps.getProperty(MODULE_KEY);
+    String storePath = factoryProps.getProperty(STORE_PATH_KEY);
 
-    if (jarOrg == null || jarModule == null) {
-      throw new ConfigStoreCreationException(configKey, "Config key URI must 
contain org and module to download from");
+    if (jarOrg == null || jarModule == null || storePath == null) {
+      throw new ConfigStoreCreationException(configKey, "Config key URI must 
contain org, module, and storePath");
     }
 
     try {
       SimpleHDFSStoreMetadata metadata = new SimpleHDFSStoreMetadata(
-          org.apache.hadoop.fs.FileSystem.get(new Configuration()), new 
Path(configKey.getPath(),
+          org.apache.hadoop.fs.FileSystem.get(new Configuration()), new 
Path(storePath,
           SimpleHadoopFilesystemConfigStore.CONFIG_STORE_NAME));
       String currentVersion = metadata.getCurrentVersion();
 
@@ -107,10 +111,17 @@ public class IvyConfigStoreFactory implements 
ConfigStoreFactory<ZipFileConfigSt
         throw new ConfigStoreCreationException(configKey, "Downloaded file 
must be a zip or jar file");
       }
 
-      return new ZipFileConfigStore((ZipFileSystem) zipFs, configKey, 
currentVersion, factoryProps.getProperty(STORE_PREFIX_KEY, ""));
-    } catch (IOException e) {
+      return new ZipFileConfigStore((ZipFileSystem) zipFs, 
getBaseURI(configKey), currentVersion, 
factoryProps.getProperty(STORE_PREFIX_KEY, ""));
+    } catch (IOException | URISyntaxException e) {
       throw new ConfigStoreCreationException(configKey, e);
     }
   }
+
+  /**
+   * Base URI for a config store should be root of the zip file, so change 
path part of URI to be null
+   */
+  private URI getBaseURI(URI configKey) throws URISyntaxException {
+    return new URI(configKey.getScheme(), configKey.getAuthority(), null, 
configKey.getQuery(), configKey.getFragment());
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce78e619/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index 4eea468..a05438c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -81,10 +81,11 @@ public class ConfigStoreUtils {
 
   public static URI getUriStringForTopic(String topicName, String commonPath, 
String configStoreUri)
       throws URISyntaxException {
-    Path path =
-        PathUtils.mergePaths(new Path(configStoreUri), 
PathUtils.mergePaths(new Path(commonPath), new Path(topicName)));
-    log.info("URI for topic is : " + path.toString());
-    return new URI(path.toString());
+    URI storeUri = new URI(configStoreUri);
+    Path path = PathUtils.mergePaths(new Path(storeUri.getPath()), 
PathUtils.mergePaths(new Path(commonPath), new Path(topicName)));
+    URI topicUri = new URI(storeUri.getScheme(), storeUri.getAuthority(), 
path.toString(), storeUri.getQuery(), storeUri.getFragment());
+    log.info("URI for topic is : " + topicUri.toString());
+    return topicUri;
   }
 
   public static Optional<Config> getConfigForTopic(Properties properties, 
String topicKey, ConfigClient configClient) {

Reply via email to