Repository: beam
Updated Branches:
  refs/heads/master 434eadb53 -> 1339dd706


Move a few more functions into BigQueryHelper


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2381dee4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2381dee4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2381dee4

Branch: refs/heads/master
Commit: 2381dee48faf52882b873597d49b9490967f6adb
Parents: d6ef010
Author: Reuven Lax <[email protected]>
Authored: Sat Mar 18 13:03:25 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Tue Mar 28 08:46:15 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 40 +++++++++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 67 ++------------------
 2 files changed, 44 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2381dee4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 37ff124..c5156e9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
@@ -196,6 +197,43 @@ class BigQueryHelpers {
     }
   }
 
+  static void verifyDatasetPresence(DatasetService datasetService, 
TableReference table) {
+    try {
+      datasetService.getDataset(table.getProjectId(), table.getDatasetId());
+    } catch (Exception e) {
+      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+      if ((e instanceof IOException) && 
errorExtractor.itemNotFound((IOException) e)) {
+        throw new IllegalArgumentException(
+            String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", 
toTableSpec(table)), e);
+      } else if (e instanceof  RuntimeException) {
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(
+            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, 
"dataset",
+                toTableSpec(table)),
+            e);
+      }
+    }
+  }
+
+  static void verifyTablePresence(DatasetService datasetService, 
TableReference table) {
+    try {
+      datasetService.getTable(table);
+    } catch (Exception e) {
+      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+      if ((e instanceof IOException) && 
errorExtractor.itemNotFound((IOException) e)) {
+        throw new IllegalArgumentException(
+            String.format(RESOURCE_NOT_FOUND_ERROR, "table", 
toTableSpec(table)), e);
+      } else if (e instanceof  RuntimeException) {
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(
+            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, 
"table",
+                toTableSpec(table)), e);
+      }
+    }
+  }
+
   @VisibleForTesting
   static class JsonSchemaToTableSchema
       implements SerializableFunction<String, TableSchema> {
@@ -283,7 +321,7 @@ class BigQueryHelpers {
       implements SerializableFunction<String, TableReference> {
     private final String executingProject;
 
-    public CreateJsonTableRefFromUuid(String executingProject) {
+    CreateJsonTableRefFromUuid(String executingProject) {
       this.executingProject = executingProject;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2381dee4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 4917083..cc6ec09 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -29,22 +29,17 @@ import 
com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.io.CountingOutputStream;
+
 import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.List;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -70,7 +65,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PBegin;
@@ -229,17 +223,6 @@ public class BigQueryIO {
 
   static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
 
-  private static final String RESOURCE_NOT_FOUND_ERROR =
-      "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s 
before pipeline"
-          + " execution. If the %1$s is created by an earlier stage of the 
pipeline, this"
-          + " validation can be disabled using #withoutValidation.";
-
-  private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
-      "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the 
%1$s is created by"
-          + " an earlier stage of the pipeline, this validation can be 
disabled using"
-          + " #withoutValidation.";
-
-
   /**
    * A formatting function that maps a TableRow to itself. This allows sending 
a
    * {@code PCollection<TableRow>} directly to BigQueryIO.Write.
@@ -451,8 +434,8 @@ public class BigQueryIO {
         checkState(table.isAccessible(), "Cannot call validate if table is 
dynamically set.");
         // Check for source table presence for early failure notification.
         DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-        verifyDatasetPresence(datasetService, table.get());
-        verifyTablePresence(datasetService, table.get());
+        BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+        BigQueryHelpers.verifyTablePresence(datasetService, table.get());
       } else if (getValidate() && getQuery() != null) {
         checkState(getQuery().isAccessible(), "Cannot call validate if query 
is dynamically set.");
         JobService jobService = getBigQueryServices().getJobService(bqOptions);
@@ -953,9 +936,9 @@ public class BigQueryIO {
         // Note that a presence check can fail when the table or dataset is 
created by an earlier
         // stage of the pipeline. For these cases the #withoutValidation 
method can be used to
         // disable the check.
-        verifyDatasetPresence(datasetService, table);
+        BigQueryHelpers.verifyDatasetPresence(datasetService, table);
         if (getCreateDisposition() == 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-          verifyTablePresence(datasetService, table);
+          BigQueryHelpers.verifyTablePresence(datasetService, table);
         }
         if (getWriteDisposition() == 
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
           BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
@@ -1085,46 +1068,6 @@ public class BigQueryIO {
 
   }
 
-  private static void verifyDatasetPresence(DatasetService datasetService, 
TableReference table) {
-    try {
-      datasetService.getDataset(table.getProjectId(), table.getDatasetId());
-    } catch (Exception e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if ((e instanceof IOException) && 
errorExtractor.itemNotFound((IOException) e)) {
-        throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", 
BigQueryHelpers.toTableSpec(table)),
-            e);
-      } else if (e instanceof  RuntimeException) {
-        throw (RuntimeException) e;
-      } else {
-        throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, 
"dataset",
-                BigQueryHelpers.toTableSpec(table)),
-            e);
-      }
-    }
-  }
-
-  private static void verifyTablePresence(DatasetService datasetService, 
TableReference table) {
-    try {
-      datasetService.getTable(table);
-    } catch (Exception e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if ((e instanceof IOException) && 
errorExtractor.itemNotFound((IOException) e)) {
-        throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "table", 
BigQueryHelpers.toTableSpec(table)),
-            e);
-      } else if (e instanceof  RuntimeException) {
-        throw (RuntimeException) e;
-      } else {
-        throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, 
"table",
-                BigQueryHelpers.toTableSpec(table)),
-            e);
-      }
-    }
-  }
-
   /**
    * Clear the cached map of created tables. Used for testing.
    */

Reply via email to