Repository: incubator-beam Updated Branches: refs/heads/master a5548f915 -> 10e628471
[BEAM-50] Fix BigQuery.Write tempFilePrefix concatenation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f92b988 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f92b988 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f92b988 Branch: refs/heads/master Commit: 8f92b9881b2f8ce24279977facdb5afe9e143521 Parents: a5548f9 Author: Pei He <[email protected]> Authored: Tue Apr 19 17:11:27 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Apr 21 19:25:13 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f92b988/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index d9debbd..9239514 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -50,6 +50,8 @@ import org.apache.beam.sdk.util.BigQueryServices.LoadService; import org.apache.beam.sdk.util.BigQueryServicesImpl; import org.apache.beam.sdk.util.BigQueryTableInserter; import org.apache.beam.sdk.util.BigQueryTableRowIterator; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.Reshuffle; @@ -1015,7 +1017,19 @@ public class BigQueryIO { table.setProjectId(options.getProject()); } String jobIdToken = UUID.randomUUID().toString(); - String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken; + String tempLocation = options.getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = factory.resolve( + factory.resolve(tempLocation, "BigQuerySinkTemp"), + jobIdToken); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), + e); + } + BigQueryServices bqServices = getBigQueryServices(); return input.apply("Write", org.apache.beam.sdk.io.Write.to( new BigQuerySink(
