Repository: beam Updated Branches: refs/heads/master 36ed6dc3c -> a2292a7dc
Remove IOChannelUtils from BigQuery TableRowWriter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc519138 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc519138 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc519138 Branch: refs/heads/master Commit: cc519138e3ea25b92cf710c6f297da2c472f0eef Parents: 36ed6dc Author: Vikas Kedigehalli <[email protected]> Authored: Sun Apr 30 23:29:20 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 1 16:03:55 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/TableRowWriter.java | 27 ++++++++++---------- .../io/gcp/bigquery/WriteBundlesToFiles.java | 2 +- .../sdk/io/gcp/bigquery/WritePartition.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index cb51158..f9d8785 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -26,7 +26,8 @@ import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.util.MimeTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,17 +40,17 @@ class TableRowWriter { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); private final String tempFilePrefix; private String id; - private String fileName; + private ResourceId resourceId; private WritableByteChannel channel; protected String mimeType = MimeTypes.TEXT; private CountingOutputStream out; public static final class Result { - final String filename; + final ResourceId resourceId; final long byteSize; - public Result(String filename, long byteSize) { - this.filename = filename; + public Result(ResourceId resourceId, long byteSize) { + this.resourceId = resourceId; this.byteSize = byteSize; } } @@ -60,22 +61,22 @@ class TableRowWriter { public final void open(String uId) throws Exception { id = uId; - fileName = tempFilePrefix + id; - LOG.debug("Opening {}.", fileName); - channel = IOChannelUtils.create(fileName, mimeType); + resourceId = FileSystems.matchNewResource(tempFilePrefix + id, false); + LOG.debug("Opening {}.", resourceId); + channel = FileSystems.create(resourceId, mimeType); try { out = new CountingOutputStream(Channels.newOutputStream(channel)); - LOG.debug("Writing header to {}.", fileName); + LOG.debug("Writing header to {}.", resourceId); } catch (Exception e) { try { - LOG.error("Writing header to {} failed, closing channel.", fileName); + LOG.error("Writing header to {} failed, closing channel.", resourceId); channel.close(); } catch (IOException closeException) { - LOG.error("Closing channel for {} failed", fileName); + LOG.error("Closing channel for {} failed", resourceId); } throw e; } - LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + LOG.debug("Starting write of bundle {} to {}.", this.id, resourceId); } public void write(TableRow value) throws Exception { @@ -85,6 +86,6 @@ class TableRowWriter { public final Result close() throws IOException { channel.close(); - return new Result(fileName, out.getCount()); + return new Result(resourceId, out.getCount()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index d337476..5f89067 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -140,7 +140,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund public void finishBundle(Context c) throws Exception { for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) { TableRowWriter.Result result = entry.getValue().close(); - c.output(new Result(result.filename, result.byteSize, entry.getKey())); + c.output(new Result(result.resourceId.toString(), result.byteSize, entry.getKey())); } writers.clear(); } http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 9414909..0ae1768 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -128,7 +128,7 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< TableRowWriter writer = new TableRowWriter(c.element()); writer.open(UUID.randomUUID().toString()); TableRowWriter.Result writerResult = writer.close(); - results.add(new Result(writerResult.filename, writerResult.byteSize, + results.add(new Result(writerResult.resourceId.toString(), writerResult.byteSize, new TableDestination(singletonTable, singletonOutputTableDescription))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index a46c1fe..baa5621 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1775,7 +1775,7 @@ public class BigQueryIOTest implements Serializable { for (int i = 0; i < numFiles; ++i) { String fileName = String.format("files%05d", i); writer.open(fileName); - fileNames.add(writer.close().filename); + fileNames.add(writer.close().resourceId.toString()); } fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));
