Add ability to stage explicit file list
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b866fef Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b866fef Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b866fef Branch: refs/heads/master Commit: 9b866fef99293d9738f0dcd862fb409265e50abb Parents: 7409ca0 Author: Kenneth Knowles <[email protected]> Authored: Tue Oct 10 21:55:49 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Oct 18 13:02:24 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/runners/dataflow/util/GcsStager.java | 42 +++++++++++++++----- .../beam/runners/dataflow/util/Stager.java | 27 ++++++++++--- 3 files changed, 54 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index e637dd4..5e91850 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -514,7 +514,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); - List<DataflowPackage> packages = options.getStager().stageFiles(); + List<DataflowPackage> packages = options.getStager().stageDefaultFiles(); // Set a unique client_request_id in the CreateJob request. http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index 929be99..ff205f0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -29,9 +29,7 @@ import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.MimeTypes; -/** - * Utility class for staging files to GCS. - */ +/** Utility class for staging files to GCS. */ public class GcsStager implements Stager { private DataflowPipelineOptions options; @@ -39,32 +37,54 @@ public class GcsStager implements Stager { this.options = options; } - @SuppressWarnings("unused") // used via reflection + @SuppressWarnings("unused") // used via reflection public static GcsStager fromOptions(PipelineOptions options) { return new GcsStager(options.as(DataflowPipelineOptions.class)); } + /** + * Stages {@link DataflowPipelineOptions#getFilesToStage()}, which defaults to every file on the + * classpath unless overridden, as well as {@link + * DataflowPipelineDebugOptions#getOverrideWindmillBinary()} if specified. + * + * @see #stageFiles(List) + */ @Override - public List<DataflowPackage> stageFiles() { + public List<DataflowPackage> stageDefaultFiles() { checkNotNull(options.getStagingLocation()); String windmillBinary = options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); + + List<String> filesToStage = options.getFilesToStage(); + if (windmillBinary != null) { - options.getFilesToStage().add("windmill_main=" + windmillBinary); + filesToStage.add("windmill_main=" + windmillBinary); } + return stageFiles(filesToStage); + } + + /** + * Stages files to {@link DataflowPipelineOptions#getStagingLocation()}, suffixed with their md5 + * hash to avoid collisions. + * + * <p>Uses {@link DataflowPipelineOptions#getGcsUploadBufferSizeBytes()}. + */ + @Override + public List<DataflowPackage> stageFiles(List<String> filesToStage) { int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); - GcsCreateOptions createOptions = GcsCreateOptions.builder() - .setGcsUploadBufferSizeBytes(uploadSizeBytes) - .setMimeType(MimeTypes.BINARY) - .build(); + GcsCreateOptions createOptions = + GcsCreateOptions.builder() + .setGcsUploadBufferSizeBytes(uploadSizeBytes) + .setMimeType(MimeTypes.BINARY) + .build(); try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) { return packageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation(), createOptions); + filesToStage, options.getStagingLocation(), createOptions); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/9b866fef/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java index 3e3c17f..f0be941 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java @@ -20,10 +20,27 @@ package org.apache.beam.runners.dataflow.util; import com.google.api.services.dataflow.model.DataflowPackage; import java.util.List; -/** - * Interface for staging files needed for running a Dataflow pipeline. - */ +/** Interface for staging files needed for running a Dataflow pipeline. */ public interface Stager { - /* Stage files and return a list of packages. */ - List<DataflowPackage> stageFiles(); + /** + * Stage default files and return a list of {@link DataflowPackage} objects describing the actual + * location at which each file was staged. + * + * <p>This is required to be identical to calling {@link #stageFiles(List)} with the default set + * of files. + * + * <p>The default is controlled by the implementation of {@link Stager}. The only known + * implementation of stager is {@link GcsStager}. See that class for more detail. + */ + List<DataflowPackage> stageDefaultFiles(); + + /** + * Stage files and return a list of packages {@link DataflowPackage} objects describing th actual + * location at which each file was staged. + * + * <p>The mechanism for staging is owned by the implementation. The only requirement is that the + * location specified in the returned {@link DataflowPackage} should, in fact, contain the + * contents of the staged file. + */ + List<DataflowPackage> stageFiles(List<String> filesToStage); }
