Repository: incubator-gobblin Updated Branches: refs/heads/master d8b1a6609 -> b67fd71b1
[GOBBLIN-205] Fix bug in pushmode copyRoute generation Add some docs to help understand and maintain the code Closes #2057 from autumnust/replication-push Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b67fd71b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b67fd71b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b67fd71b Branch: refs/heads/master Commit: b67fd71b1ea6e3eb47a5ac16d192067fb0ad8a68 Parents: d8b1a66 Author: Lei Sun <[email protected]> Authored: Tue Aug 29 14:43:59 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Tue Aug 29 14:43:59 2017 -0700 ---------------------------------------------------------------------- .../copy/replication/ConfigBasedMultiDatasets.java | 15 ++++++++++++++- .../copy/replication/CopyRouteGeneratorBase.java | 7 +++++++ .../copy/replication/DataFlowTopology.java | 10 ++++++++++ .../copy/replication/ReplicationConfiguration.java | 2 ++ 4 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java index 00374e9..3f9a57f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java @@ -132,14 +132,19 @@ public class ConfigBasedMultiDatasets { Optional<List<CopyRoute>> copyRoutes = cpGen.getPushRoutes(rc, pushFrom); if(!copyRoutes.isPresent()) { log.warn("In Push mode, did not found any copyRoute for dataset with meta data {}", rc.getMetaData()); - return; + continue; } + /** + * For-Loop responsibility: + * For each of the {@link CopyRoute}, generate a {@link ConfigBasedDataset}. + */ for(CopyRoute cr: copyRoutes.get()){ if(cr.getCopyTo() instanceof HadoopFsEndPoint){ HadoopFsEndPoint ep = (HadoopFsEndPoint)cr.getCopyTo(); if(ep.getFsURI().toString().equals(pushModeTargetCluster)){ + // For a candidate dataset, iterate thru. all available blacklist patterns. ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, cr); if (blacklistFilteringHelper(configBasedDataset, this.blacklist)){ @@ -164,10 +169,18 @@ public class ConfigBasedMultiDatasets { // PULL mode CopyRouteGenerator cpGen = rc.getCopyRouteGenerator(); + /** + * Replicas comes from the 'List' which will normally be set in gobblin.replicas + * Basically this is all possible destination for this replication job. + */ List<EndPoint> replicas = rc.getReplicas(); for(EndPoint replica: replicas){ // Only pull the data from current execution cluster if(needGenerateCopyEntity(replica, executionClusterURI)){ + /* + * CopyRoute represent a coypable Dataset to execute, e.g. if you specify source:[war, holdem], + * there could be two {@link #ConfigBasedDataset} generated. + */ Optional<CopyRoute> copyRoute = cpGen.getPullRoute(rc, replica); if(copyRoute.isPresent()){ ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, copyRoute.get()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java index d732fcf..da88a32 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java @@ -45,11 +45,18 @@ public class CopyRouteGeneratorBase implements CopyRouteGenerator { List<DataFlowTopology.DataFlowPath> paths = topology.getDataFlowPaths(); for (DataFlowTopology.DataFlowPath p : paths) { + /** + * Routes are list of pairs that generated from config in the format of topology specification. + * For example, source:[holdem, war] will end up with + * List<(source, holdem), (source, war)> + */ List<CopyRoute> routes = p.getCopyRoutes(); + if (routes.isEmpty()) { continue; } + // All the routes should has the same copyFrom but different copyTo. if (routes.get(0).getCopyFrom().equals(copyFrom)) { return Optional.of(routes); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java index 9c8e15b..fc90044 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java @@ -36,6 +36,16 @@ import lombok.AllArgsConstructor;; */ @Data +/** + * Some explanation combined with the example in configStore: + * + * {@link DataFlowTopology} is the whole block specified in, + * say {@link ReplicationConfiguration#DEFAULT_DATA_FLOW_TOPOLOGIES_PULLMODE}, normally call this topology. + * + * {@link #dataFlowPaths} is representing a line like: tarock:[source,holdem] + * + * From {@link #dataFlowPaths} we can have a list of {@link CopyRoute}. + */ public class DataFlowTopology { private List<DataFlowPath> dataFlowPaths = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java index 2deafba..b2df7f1 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java @@ -343,6 +343,8 @@ public class ReplicationConfiguration { else { Set<String> currentCopyTo = new HashSet<>(); for (final Map.Entry<String, EndPoint> valid : validEndPoints.entrySet()) { + + // Only generate copyRoute from the EndPoint that running this job. if (routesConfig.hasPath(valid.getKey())) { List<String> copyToStringsRaw = routesConfig.getStringList(valid.getKey()); List<String> copyToStrings = new ArrayList<>();
