Repository: beam Updated Branches: refs/heads/master 434eadb53 -> 1339dd706
Move a few more functions into BigQueryHelper Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2381dee4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2381dee4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2381dee4 Branch: refs/heads/master Commit: 2381dee48faf52882b873597d49b9490967f6adb Parents: d6ef010 Author: Reuven Lax <[email protected]> Authored: Sat Mar 18 13:03:25 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 08:46:15 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 40 +++++++++++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 67 ++------------------ 2 files changed, 44 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2381dee4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 37ff124..c5156e9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -196,6 +197,43 @@ class BigQueryHelpers { } } + static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { + try { + datasetService.getDataset(table.getProjectId(), table.getDatasetId()); + } catch (Exception e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { + throw new IllegalArgumentException( + String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", toTableSpec(table)), e); + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException( + String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", + toTableSpec(table)), + e); + } + } + } + + static void verifyTablePresence(DatasetService datasetService, TableReference table) { + try { + datasetService.getTable(table); + } catch (Exception e) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { + throw new IllegalArgumentException( + String.format(RESOURCE_NOT_FOUND_ERROR, "table", toTableSpec(table)), e); + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException( + String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", + toTableSpec(table)), e); + } + } + } + @VisibleForTesting static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> { @@ -283,7 +321,7 @@ class BigQueryHelpers { implements SerializableFunction<String, TableReference> { private final String executingProject; - public CreateJsonTableRefFromUuid(String executingProject) { + CreateJsonTableRefFromUuid(String executingProject) { this.executingProject = executingProject; } http://git-wip-us.apache.org/repos/asf/beam/blob/2381dee4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4917083..cc6ec09 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -29,22 +29,17 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.io.CountingOutputStream; + import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -70,7 +65,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PBegin; @@ -229,17 +223,6 @@ public class BigQueryIO { static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP); - private static final String RESOURCE_NOT_FOUND_ERROR = - "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" - + " execution. If the %1$s is created by an earlier stage of the pipeline, this" - + " validation can be disabled using #withoutValidation."; - - private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR = - "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by" - + " an earlier stage of the pipeline, this validation can be disabled using" - + " #withoutValidation."; - - /** * A formatting function that maps a TableRow to itself. This allows sending a * {@code PCollection<TableRow>} directly to BigQueryIO.Write. @@ -451,8 +434,8 @@ public class BigQueryIO { checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); // Check for source table presence for early failure notification. DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); - verifyDatasetPresence(datasetService, table.get()); - verifyTablePresence(datasetService, table.get()); + BigQueryHelpers.verifyDatasetPresence(datasetService, table.get()); + BigQueryHelpers.verifyTablePresence(datasetService, table.get()); } else if (getValidate() && getQuery() != null) { checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); JobService jobService = getBigQueryServices().getJobService(bqOptions); @@ -953,9 +936,9 @@ public class BigQueryIO { // Note that a presence check can fail when the table or dataset is created by an earlier // stage of the pipeline. For these cases the #withoutValidation method can be used to // disable the check. - verifyDatasetPresence(datasetService, table); + BigQueryHelpers.verifyDatasetPresence(datasetService, table); if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { - verifyTablePresence(datasetService, table); + BigQueryHelpers.verifyTablePresence(datasetService, table); } if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); @@ -1085,46 +1068,6 @@ public class BigQueryIO { } - private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { - try { - datasetService.getDataset(table.getProjectId(), table.getDatasetId()); - } catch (Exception e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { - throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryHelpers.toTableSpec(table)), - e); - } else if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else { - throw new RuntimeException( - String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", - BigQueryHelpers.toTableSpec(table)), - e); - } - } - } - - private static void verifyTablePresence(DatasetService datasetService, TableReference table) { - try { - datasetService.getTable(table); - } catch (Exception e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { - throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryHelpers.toTableSpec(table)), - e); - } else if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else { - throw new RuntimeException( - String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", - BigQueryHelpers.toTableSpec(table)), - e); - } - } - } - /** * Clear the cached map of created tables. Used for testing. */
