http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 3c67c3d..a2454fb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -1,12 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -29,9 +47,18 @@ import java.io.ByteArrayInputStream; import java.io.FileReader; import java.io.IOException; import java.io.Serializable; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.TableRowJsonCoder; @@ -40,10 +67,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.Transport; import org.joda.time.Duration; /** + * A fake implementation of BigQuery's job service. */ class FakeJobService implements JobService, Serializable { static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); @@ -66,6 +96,8 @@ class FakeJobService implements JobService, Serializable { private static final com.google.common.collect.Table<String, String, JobInfo> allJobs = HashBasedTable.create(); + private static final com.google.common.collect.Table<String, String, List<String>> + filesForLoadJobs = HashBasedTable.create(); private static final com.google.common.collect.Table<String, String, JobStatistics> dryRunQueryResults = HashBasedTable.create(); @@ -82,6 +114,18 @@ class FakeJobService implements JobService, Serializable { job.setConfiguration(new JobConfiguration().setLoad(loadConfig)); job.setKind(" bigquery#job"); job.setStatus(new JobStatus().setState("PENDING")); + + // Copy the files to a new location for import, as the temporary files will be deleted by + // the caller. + if (loadConfig.getSourceUris().size() > 0) { + List<String> loadFiles = Lists.newArrayList(); + for (String filename : loadConfig.getSourceUris()) { + loadFiles.add(filename + ThreadLocalRandom.current().nextInt()); + } + IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles); + filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles); + } + allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job)); } } @@ -91,8 +135,6 @@ class FakeJobService implements JobService, Serializable { throws InterruptedException, IOException { checkArgument(extractConfig.getDestinationFormat().equals("AVRO"), "Only extract to AVRO is supported"); - checkArgument(extractConfig.getDestinationUris().size() == 1, - "Must specify exactly one destination URI."); synchronized (allJobs) { Job job = new Job(); job.setJobReference(jobRef); @@ -106,6 +148,14 @@ class FakeJobService implements JobService, Serializable { @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException { + synchronized (allJobs) { + Job job = new Job(); + job.setJobReference(jobRef); + job.setConfiguration(new JobConfiguration().setQuery(query)); + job.setKind(" bigquery#job"); + job.setStatus(new JobStatus().setState("PENDING")); + allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job)); + } } @Override @@ -127,8 +177,8 @@ class FakeJobService implements JobService, Serializable { BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(maxAttempts) - .withInitialBackoff(Duration.millis(50)) - .withMaxBackoff(Duration.standardMinutes(1)) + .withInitialBackoff(Duration.millis(10)) + .withMaxBackoff(Duration.standardSeconds(1)) .backoff(); Sleeper sleeper = Sleeper.DEFAULT; try { @@ -136,7 +186,8 @@ class FakeJobService implements JobService, Serializable { Job job = getJob(jobRef); if (job != null) { JobStatus status = job.getStatus(); - if (status != null && status.getState() != null && status.getState().equals("DONE")) { + if (status != null && status.getState() != null + && (status.getState().equals("DONE") || status.getState().equals("FAILED"))) { return job; } } @@ -173,12 +224,15 @@ class FakeJobService implements JobService, Serializable { if (job == null) { return null; } - ++job.getJobCount; - if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) { - job.job.getStatus().setState("RUNNING"); - } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) { - runJob(job.job); - job.job.getStatus().setState("DONE"); + try { + ++job.getJobCount; + if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) { + job.job.getStatus().setState("RUNNING"); + } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) { + job.job.setStatus(runJob(job.job)); + } + } catch (Exception e) { + job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto()); } return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class); } @@ -187,41 +241,50 @@ class FakeJobService implements JobService, Serializable { } } - private void runJob(Job job) throws InterruptedException, IOException { + private JobStatus runJob(Job job) throws InterruptedException, IOException { if (job.getConfiguration().getLoad() != null) { - runLoadJob(job.getConfiguration().getLoad()); + return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad()); } else if (job.getConfiguration().getCopy() != null) { - runCopyJob(job.getConfiguration().getCopy()); + return runCopyJob(job.getConfiguration().getCopy()); } else if (job.getConfiguration().getExtract() != null) { - runExtractJob(job, job.getConfiguration().getExtract()); + return runExtractJob(job, job.getConfiguration().getExtract()); + } else if (job.getConfiguration().getQuery() != null) { + return runQueryJob(job.getConfiguration().getQuery()); } + return new JobStatus().setState("DONE"); } - private void validateDispositions(Table table, CreateDisposition createDisposition, - WriteDisposition writeDisposition) + private boolean validateDispositions(Table table, CreateDisposition createDisposition, + WriteDisposition writeDisposition) throws InterruptedException, IOException { if (table == null) { - checkState(createDisposition != CreateDisposition.CREATE_NEVER, - "CreateDisposition == CREATE_NEVER but the table doesn't exist."); + if (createDisposition == CreateDisposition.CREATE_NEVER) { + return false; + } } else if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) { datasetService.deleteTable(table.getTableReference()); } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) { List<TableRow> allRows = datasetService.getAllRows(table.getTableReference().getProjectId(), table.getTableReference().getDatasetId(), table.getTableReference().getTableId()); - checkState(allRows.isEmpty(), "Write disposition was set to WRITE_EMPTY," - + " but the table was not empty."); + if (!allRows.isEmpty()) { + return false; + } } + return true; } - private void runLoadJob(JobConfigurationLoad load) + + private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) throws InterruptedException, IOException { TableReference destination = load.getDestinationTable(); TableSchema schema = load.getSchema(); - List<String> sourceFiles = load.getSourceUris(); + List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition()); CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition()); checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON")); Table existingTable = datasetService.getTable(destination); - validateDispositions(existingTable, createDisposition, writeDisposition); + if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } datasetService.createTable(new Table().setTableReference(destination).setSchema(schema)); @@ -230,31 +293,52 @@ class FakeJobService implements JobService, Serializable { rows.addAll(readRows(filename)); } datasetService.insertAll(destination, rows, null); + return new JobStatus().setState("DONE"); } - private void runCopyJob(JobConfigurationTableCopy copy) + private JobStatus runCopyJob(JobConfigurationTableCopy copy) throws InterruptedException, IOException { List<TableReference> sources = copy.getSourceTables(); TableReference destination = copy.getDestinationTable(); WriteDisposition writeDisposition = WriteDisposition.valueOf(copy.getWriteDisposition()); CreateDisposition createDisposition = CreateDisposition.valueOf(copy.getCreateDisposition()); Table existingTable = datasetService.getTable(destination); - validateDispositions(existingTable, createDisposition, writeDisposition); + if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } List<TableRow> allRows = Lists.newArrayList(); for (TableReference source : sources) { allRows.addAll(datasetService.getAllRows( source.getProjectId(), source.getDatasetId(), source.getTableId())); } + datasetService.createTable(new Table().setTableReference(destination)); datasetService.insertAll(destination, allRows, null); + return new JobStatus().setState("DONE"); } - private void runExtractJob(Job job, JobConfigurationExtract extract) { + private JobStatus runExtractJob(Job job, JobConfigurationExtract extract) + throws InterruptedException, IOException { TableReference sourceTable = extract.getSourceTable(); - extract.getDestinationUris().get(0); - List<Long> destinationFileCounts = Lists.newArrayList(0L); + + List<TableRow> rows = datasetService.getAllRows( + sourceTable.getProjectId(), sourceTable.getDatasetId(), sourceTable.getTableId()); + TableSchema schema = datasetService.getTable(sourceTable).getSchema(); + List<Long> destinationFileCounts = Lists.newArrayList(); + for (String destination : extract.getDestinationUris()) { + destinationFileCounts.add(writeRows(sourceTable.getTableId(), rows, schema, destination)); + } job.setStatistics(new JobStatistics().setExtract( new JobStatistics4().setDestinationUriFileCounts(destinationFileCounts))); + return new JobStatus().setState("DONE"); + } + + private JobStatus runQueryJob(JobConfigurationQuery query) + throws IOException, InterruptedException { + List<TableRow> rows = FakeBigQueryServices.rowsFromEncodedQuery(query.getQuery()); + datasetService.createTable(new Table().setTableReference(query.getDestinationTable())); + datasetService.insertAll(query.getDestinationTable(), rows, null); + return new JobStatus().setState("DONE"); } private List<TableRow> readRows(String filename) throws IOException { @@ -270,4 +354,42 @@ class FakeJobService implements JobService, Serializable { } return tableRows; } + + private long writeRows(String tableId, List<TableRow> rows, TableSchema schema, + String destinationPattern) throws IOException { + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableId, schema.getFields()); + List<TableRow> rowsToWrite = Lists.newArrayList(); + int shard = 0; + for (int i = 0; i < rows.size(); ++i) { + rowsToWrite.add(rows.get(i)); + if (rowsToWrite.size() == 5) { + writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++); + rowsToWrite.clear(); + } + } + if (!rowsToWrite.isEmpty()) { + writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++); + } + return shard; + } + + private void writeRowsHelper(List<TableRow> rows, Schema avroSchema, + String destinationPattern, int shard) throws IOException { + String filename = destinationPattern.replace("*", String.format("%012d", shard)); + try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY); + DataFileWriter<GenericRecord> tableRowWriter = + new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema)) + .create(avroSchema, Channels.newOutputStream(channel))) { + for (Map<String, Object> record : rows) { + GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema); + for (Map.Entry<String, Object> field : record.entrySet()) { + genericRecordBuilder.set(field.getKey(), field.getValue()); + } + tableRowWriter.append(genericRecordBuilder.build()); + } + } catch (IOException e) { + throw new IllegalStateException( + String.format("Could not create destination for extract job %s", filename), e); + } + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java index b2fc170..d52723b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.Table; @@ -7,23 +24,31 @@ import java.util.ArrayList; import java.util.List; /** - * Created by relax on 3/30/17. + * Encapsulates a BigQuery Table, and it's contents. */ class TableContainer { Table table; List<TableRow> rows; List<String> ids; - + Long sizeBytes; TableContainer(Table table) { this.table = table; this.rows = new ArrayList<>(); this.ids = new ArrayList<>(); + this.sizeBytes = 0L; } - TableContainer addRow(TableRow row, String id) { + long addRow(TableRow row, String id) { rows.add(row); ids.add(id); - return this; + long rowSize = row.toString().length(); + Long tableSize = table.getNumBytes(); + if (tableSize == null) { + table.setNumBytes(rowSize); + } else { + table.setNumBytes(tableSize + rowSize); + } + return rowSize; } Table getTable() {
