Repository: incubator-gobblin Updated Branches: refs/heads/master 520bcf5df -> 14c1b03bb
[GOBBLIN-251] Updated UpdateProviderFactory to be able to instantiate UpdateProvider with specified URI Closes #2102 from jinhyukchang/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/14c1b03b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/14c1b03b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/14c1b03b Branch: refs/heads/master Commit: 14c1b03bb6e59832eaf830bb06ef9f1b7ad293a8 Parents: 520bcf5 Author: Jin Hyuk Chang <[email protected]> Authored: Wed Sep 13 11:14:02 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Sep 13 11:14:19 2017 -0700 ---------------------------------------------------------------------- .../hive/provider/UpdateProviderFactory.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/14c1b03b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/provider/UpdateProviderFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/provider/UpdateProviderFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/provider/UpdateProviderFactory.java index 15794ac..715921a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/provider/UpdateProviderFactory.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/provider/UpdateProviderFactory.java @@ -18,8 +18,11 @@ package org.apache.gobblin.data.management.conversion.hive.provider; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.net.URI; import java.util.Properties; +import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.fs.FileSystem; import com.google.common.collect.ImmutableList; @@ -33,17 +36,20 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * A factory class to create {@link HiveUnitUpdateProvider}s */ @Alpha +@Slf4j public class UpdateProviderFactory { private static final String OPTIONAL_HIVE_UNIT_UPDATE_PROVIDER_CLASS_KEY = "hive.unit.updateProvider.class"; private static final String DEFAULT_HIVE_UNIT_UPDATE_PROVIDER_CLASS = HdfsBasedUpdateProvider.class .getName(); + static final String UPDATE_PROVIDER_FS_URI = "hive.unit.updateProvider.fs.uri"; + public static HiveUnitUpdateProvider create(State state) { try { return (HiveUnitUpdateProvider) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(state.getProp( OPTIONAL_HIVE_UNIT_UPDATE_PROVIDER_CLASS_KEY, DEFAULT_HIVE_UNIT_UPDATE_PROVIDER_CLASS)), - ImmutableList.<Object>of(FileSystem.get(HadoopUtils.getConfFromState(state))), ImmutableList.of()); + ImmutableList.<Object>of(getFileSystem(state.getProperties())), ImmutableList.of()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException | IOException e) { throw new RuntimeException(e); @@ -55,10 +61,19 @@ public class UpdateProviderFactory { return (HiveUnitUpdateProvider) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(properties .getProperty( OPTIONAL_HIVE_UNIT_UPDATE_PROVIDER_CLASS_KEY, DEFAULT_HIVE_UNIT_UPDATE_PROVIDER_CLASS)), - ImmutableList.<Object>of(FileSystem.get(HadoopUtils.getConfFromProperties(properties))), ImmutableList.of()); + ImmutableList.<Object>of(getFileSystem(properties)), ImmutableList.of()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException | IOException e) { throw new RuntimeException(e); } } + + private static FileSystem getFileSystem(Properties properties) throws IOException { + String uri = properties.getProperty(UPDATE_PROVIDER_FS_URI); + if (uri == null) { + return FileSystem.get(HadoopUtils.getConfFromProperties(properties)); + } + log.info("Using file system URI {}", uri); + return FileSystem.get(URI.create(uri), HadoopUtils.getConfFromProperties(properties)); + } }
