[ https://issues.apache.org/jira/browse/GOBBLIN-853?focusedWorklogId=316885&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316885 ]
ASF GitHub Bot logged work on GOBBLIN-853: ------------------------------------------ Author: ASF GitHub Bot Created on: 23/Sep/19 18:05 Start Date: 23/Sep/19 18:05 Worklog Time Spent: 10m Work Description: sv2000 commented on pull request #2709: [GOBBLIN-853] Support multiple paths specified in flow config URL: https://github.com/apache/incubator-gobblin/pull/2709#discussion_r327250309 ########## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ########## @@ -197,6 +210,87 @@ public void awaitHealthy() throws InterruptedException { return jobExecutionPlanDag; } + /** + * If {@link FlowSpec} has {@link #DATASET_SUBPATHS_KEY}, split it into multiple flowSpecs using a provided base input + * and base output path to generate multiple source/destination paths. + */ + private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) { + long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec); + List<FlowSpec> flowSpecs = new ArrayList<>(); + + if (flowSpec.getConfig().hasPath(DATASET_SUBPATHS_KEY)) { + List<String> datasetSubpaths = ConfigUtils.getStringList(flowSpec.getConfig(), DATASET_SUBPATHS_KEY); + String baseInputPath = ConfigUtils.getString(flowSpec.getConfig(), DATASET_BASE_INPUT_PATH_KEY, "/"); + String baseOutputPath = ConfigUtils.getString(flowSpec.getConfig(), DATASET_BASE_OUTPUT_PATH_KEY, "/"); + + for (String subPath : datasetSubpaths) { + Config newConfig = flowSpec.getConfig().withoutPath("dataset.subPaths") + .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId)) + .withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY, + ConfigValueFactory.fromAnyRef(new Path(baseInputPath, subPath).toString())) + .withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY, + ConfigValueFactory.fromAnyRef(new Path(baseOutputPath, subPath).toString())); + flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig)); + } + } else { + return splitFlowSpecByNumber(flowSpec); + } + + return flowSpecs; + } + + /** + * If {@link FlowSpec} has config keys like configKey.0, configKey.1, split it into multiple flowSpecs on these properties. + * Properties that do not specify numbers will be present in all returned flowSpecs. + */ + private static List<FlowSpec> splitFlowSpecByNumber(FlowSpec flowSpec) { Review comment: We probably should revisit this later. IIUC, it looks like there are 2 possible ways to specify multi-dataset flows, and the base-path/sub-path is a special case of splitting flowspecs by number. Better to focus on the common use case first i.e. the one where we have a common base path with different sub-paths. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 316885) Time Spent: 1h (was: 50m) > Support multiple paths specified in flow config > ----------------------------------------------- > > Key: GOBBLIN-853 > URL: https://issues.apache.org/jira/browse/GOBBLIN-853 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Jack Moseley > Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)