Repository: beam Updated Branches: refs/heads/master 24c6ff44e -> 70e53e7dc
Remove IOChannelUtil/Factory from BigQueryIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40dc8443 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40dc8443 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40dc8443 Branch: refs/heads/master Commit: 40dc844304a8432ee5c8e81b9cc806ef2ae3c1ea Parents: 24c6ff4 Author: Vikas Kedigehalli <[email protected]> Authored: Wed May 3 22:41:06 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 4 12:23:14 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 - .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 17 +++----- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 35 ++++++++-------- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++---- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 43 +++++--------------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 15 +++---- .../sdk/io/gcp/bigquery/FakeJobService.java | 29 ++++++++----- 7 files changed, 65 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 78d39b5..f422135 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -205,7 +205,6 @@ class BatchLoads<DestinationT> bigQueryServices, jobIdTokenView, schemasView, - stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, dynamicDestinations)) @@ -243,7 +242,6 @@ class BatchLoads<DestinationT> bigQueryServices, jobIdTokenView, schemasView, - stepUuid, writeDisposition, createDisposition, dynamicDestinations)) http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 6b4e518..318ea89 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -36,12 +36,12 @@ import java.util.UUID; import java.util.regex.Matcher; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; /** A set of helper functions and classes used by {@link BigQueryIO}. */ public class BigQueryHelpers { @@ -309,14 +309,9 @@ public class BigQueryHelpers { static String resolveTempLocation( String tempLocationDir, String bigQueryOperationName, String stepUuid) { - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir); - return factory.resolve( - factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve temp destination directory in %s", - tempLocationDir), e); - } + return FileSystems.matchNewResource(tempLocationDir, true) + .resolve(bigQueryOperationName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(stepUuid, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY) + .toString(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index c76ee86..0e36393 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -38,8 +38,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; + import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -50,6 +50,10 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; @@ -67,8 +71,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -520,19 +522,14 @@ public class BigQueryIO { Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef); - Collection<String> extractFiles = null; if (extractJob != null) { - extractFiles = getExtractFilePaths(extractDestinationDir, extractJob); - } else { - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); - Collection<String> dirMatch = factory.match(extractDestinationDir); - if (!dirMatch.isEmpty()) { - extractFiles = factory.match(factory.resolve(extractDestinationDir, "*")); + List<ResourceId> extractFiles = + getExtractFilePaths(extractDestinationDir, extractJob); + if (extractFiles != null && !extractFiles.isEmpty()) { + FileSystems.delete(extractFiles, + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); } } - if (extractFiles != null && !extractFiles.isEmpty()) { - IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles); - } } }; return input.getPipeline() @@ -583,7 +580,7 @@ public class BigQueryIO { return String.format("%s/%s", extractDestinationDir, "*.avro"); } - static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob) + static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob) throws IOException { JobStatistics jobStats = extractJob.getStatistics(); List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts(); @@ -597,11 +594,13 @@ public class BigQueryIO { } long filesCount = counts.get(0); - ImmutableList.Builder<String> paths = ImmutableList.builder(); - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + ImmutableList.Builder<ResourceId> paths = ImmutableList.builder(); + ResourceId extractDestinationDirResourceId = + FileSystems.matchNewResource(extractDestinationDir, true /* isDirectory */); for (long i = 0; i < filesCount; ++i) { - String filePath = - factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro")); + ResourceId filePath = extractDestinationDirResourceId.resolve( + String.format("%012d%s", i, ".avro"), + ResolveOptions.StandardResolveOptions.RESOLVE_FILE); paths.add(filePath); } return paths.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 49000d6..945c7d4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; @@ -90,7 +91,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); - List<String> tempFiles = executeExtract( + List<ResourceId> tempFiles = executeExtract( extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); TableSchema tableSchema = bqServices.getDatasetService(bqOptions) @@ -116,7 +117,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { return TableRowJsonCoder.of(); } - private List<String> executeExtract( + private List<ResourceId> executeExtract( String jobId, TableReference table, JobService jobService, String executingProject, String extractDestinationDir) throws InterruptedException, IOException { @@ -143,12 +144,11 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { LOG.info("BigQuery extract job completed: {}", jobId); - List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); - return ImmutableList.copyOf(tempFiles); + return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); } private List<BoundedSource<TableRow>> createSources( - List<String> files, TableSchema tableSchema) throws IOException, InterruptedException { + List<ResourceId> files, TableSchema tableSchema) throws IOException, InterruptedException { final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema); SerializableFunction<GenericRecord, TableRow> function = @@ -160,9 +160,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { }}; List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); - for (String fileName : files) { + for (ResourceId file : files) { avroSources.add(new TransformingSource<>( - AvroSource.from(fileName), function, getDefaultOutputCoder())); + AvroSource.from(file.toString()), function, getDefaultOutputCoder())); } return ImmutableList.copyOf(avroSources); } http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c480b42..e7dba2a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -18,37 +18,30 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; - import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.Collection; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.FileIOChannelFactory; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.slf4j.Logger; @@ -74,7 +67,6 @@ class WriteTables<DestinationT> private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; private final PCollectionView<Map<DestinationT, String>> schemasView; - private final String stepUuid; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; private final DynamicDestinations<?, DestinationT> dynamicDestinations; @@ -84,7 +76,6 @@ class WriteTables<DestinationT> BigQueryServices bqServices, PCollectionView<String> jobIdToken, PCollectionView<Map<DestinationT, String>> schemasView, - String stepUuid, WriteDisposition writeDisposition, CreateDisposition createDisposition, DynamicDestinations<?, DestinationT> dynamicDestinations) { @@ -92,7 +83,6 @@ class WriteTables<DestinationT> this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.schemasView = schemasView; - this.stepUuid = stepUuid; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.dynamicDestinations = dynamicDestinations; @@ -114,8 +104,6 @@ class WriteTables<DestinationT> tableReference, tableDestination.getTableDescription()); } - String tempFilePrefix = resolveTempLocation( - c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); Integer partition = c.element().getKey().getShardNumber(); List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); String jobIdPrefix = @@ -137,7 +125,7 @@ class WriteTables<DestinationT> tableDestination.getTableDescription()); c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); - removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles); + removeTemporaryFiles(partitionFiles); } private void load( @@ -198,22 +186,11 @@ class WriteTables<DestinationT> BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } - static void removeTemporaryFiles( - PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix); - if (factory instanceof GcsIOChannelFactory) { - GcsUtil gcsUtil = new GcsUtilFactory().create(options); - gcsUtil.remove(files); - } else if (factory instanceof FileIOChannelFactory) { - for (String filename : files) { - LOG.debug("Removing file {}", filename); - boolean exists = Files.deleteIfExists(Paths.get(filename)); - if (!exists) { - LOG.debug("{} does not exist.", filename); - } - } - } else { - throw new IOException("Unrecognized file system."); + static void removeTemporaryFiles(Collection<String> files) throws IOException { + ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder(); + for (String file: files) { + fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */)); } + FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 026afce..aabae3e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -83,7 +83,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -118,7 +120,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowedValue; @@ -1820,7 +1821,9 @@ public class BigQueryIOTest implements Serializable { for (int k = 0; k < numFilesPerPartition; ++k) { String filename = Paths.get(baseDir.toString(), String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString(); - try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) { + ResourceId fileResource = + FileSystems.matchNewResource(filename, false /* isDirectory */); + try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) { try (OutputStream output = Channels.newOutputStream(channel)) { TableRow tableRow = new TableRow().set("name", tableName); TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER); @@ -1858,7 +1861,6 @@ public class BigQueryIOTest implements Serializable { fakeBqServices, jobIdTokenView, schemaMapView, - stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, new IdentityDynamicTables()); @@ -1904,14 +1906,9 @@ public class BigQueryIOTest implements Serializable { File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 10); - WriteTables.removeTemporaryFiles(bqOptions, tempFilePrefix, fileNames); + WriteTables.removeTemporaryFiles(fileNames); testNumFiles(tempDir, 0); - - for (String fileName : fileNames) { - loggedWriteTables.verifyDebug("Removing file " + fileName); - } - loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not exist."); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 13d345e..ee3af0b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -40,6 +40,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.BufferedReader; @@ -61,12 +62,14 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.Transport; import org.joda.time.Duration; @@ -95,7 +98,7 @@ class FakeJobService implements JobService, Serializable { HashBasedTable.create(); private static int numExtractJobCalls = 0; - private static final com.google.common.collect.Table<String, String, List<String>> + private static final com.google.common.collect.Table<String, String, List<ResourceId>> filesForLoadJobs = HashBasedTable.create(); private static final com.google.common.collect.Table<String, String, JobStatistics> dryRunQueryResults = HashBasedTable.create(); @@ -117,12 +120,17 @@ class FakeJobService implements JobService, Serializable { // Copy the files to a new location for import, as the temporary files will be deleted by // the caller. if (loadConfig.getSourceUris().size() > 0) { - List<String> loadFiles = Lists.newArrayList(); + ImmutableList.Builder<ResourceId> sourceFiles = ImmutableList.builder(); + ImmutableList.Builder<ResourceId> loadFiles = ImmutableList.builder(); for (String filename : loadConfig.getSourceUris()) { - loadFiles.add(filename + ThreadLocalRandom.current().nextInt()); + sourceFiles.add(FileSystems.matchNewResource(filename, false /* isDirectory */)); + loadFiles.add(FileSystems.matchNewResource( + filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */)); } - IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles); - filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles); + + FileSystems.copy(sourceFiles.build(), loadFiles.build(), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build()); } allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job)); @@ -286,7 +294,7 @@ class FakeJobService implements JobService, Serializable { throws InterruptedException, IOException { TableReference destination = load.getDestinationTable(); TableSchema schema = load.getSchema(); - List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); + List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition()); CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition()); checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON")); @@ -298,8 +306,8 @@ class FakeJobService implements JobService, Serializable { datasetService.createTable(new Table().setTableReference(destination).setSchema(schema)); List<TableRow> rows = Lists.newArrayList(); - for (String filename : sourceFiles) { - rows.addAll(readRows(filename)); + for (ResourceId filename : sourceFiles) { + rows.addAll(readRows(filename.toString())); } datasetService.insertAll(destination, rows, null); return new JobStatus().setState("DONE"); @@ -385,7 +393,8 @@ class FakeJobService implements JobService, Serializable { private void writeRowsHelper(List<TableRow> rows, Schema avroSchema, String destinationPattern, int shard) throws IOException { String filename = destinationPattern.replace("*", String.format("%012d", shard)); - try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY); + try (WritableByteChannel channel = FileSystems.create( + FileSystems.matchNewResource(filename, false /* isDirectory */), MimeTypes.BINARY); DataFileWriter<GenericRecord> tableRowWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema)) .create(avroSchema, Channels.newOutputStream(channel))) {
