This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f2503ba  [BEAM-3008] Adds parameters templatization for Bigtable 
(#4357)
f2503ba is described below

commit f2503bad7511ef5f4856fc8af9c24b01a8561b3c
Author: dmytroivanov4206 <[email protected]>
AuthorDate: Wed Jan 24 06:51:31 2018 +0100

    [BEAM-3008] Adds parameters templatization for Bigtable (#4357)
---
 .../beam/sdk/io/gcp/bigtable/BigtableConfig.java   | 100 ++++----
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 222 +++++++++++-------
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   |   4 +-
 .../sdk/io/gcp/bigtable/BigtableConfigTest.java    | 252 +++++++++++++++++++++
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 104 +++++----
 5 files changed, 494 insertions(+), 188 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
index ba633d0..4d2e4ce 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.bigtable.config.BigtableOptions;
@@ -30,6 +29,7 @@ import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
@@ -43,19 +43,19 @@ abstract class BigtableConfig implements Serializable {
    * Returns the project id being written to.
    */
   @Nullable
-  abstract String getProjectId();
+  abstract ValueProvider<String> getProjectId();
 
   /**
    * Returns the instance id being written to.
    */
   @Nullable
-  abstract String getInstanceId();
+  abstract ValueProvider<String> getInstanceId();
 
   /**
    * Returns the table being read from.
    */
   @Nullable
-  abstract String getTableId();
+  abstract ValueProvider<String> getTableId();
 
   /**
    * Returns the Google Cloud Bigtable instance being written to, and other 
parameters.
@@ -93,11 +93,11 @@ abstract class BigtableConfig implements Serializable {
   @AutoValue.Builder
   abstract static class Builder {
 
-    abstract Builder setProjectId(String projectId);
+    abstract Builder setProjectId(ValueProvider<String> projectId);
 
-    abstract Builder setInstanceId(String instanceId);
+    abstract Builder setInstanceId(ValueProvider<String> instanceId);
 
-    abstract Builder setTableId(String tableId);
+    abstract Builder setTableId(ValueProvider<String> tableId);
 
     /**
      * @deprecated will be replaced by bigtable options configurator.
@@ -115,18 +115,18 @@ abstract class BigtableConfig implements Serializable {
     abstract BigtableConfig build();
   }
 
-  BigtableConfig withProjectId(String projectId) {
-    checkNotNull(projectId, "Project Id of BigTable can not be null");
+  BigtableConfig withProjectId(ValueProvider<String> projectId) {
+    checkArgument(projectId != null, "Project Id of BigTable can not be null");
     return toBuilder().setProjectId(projectId).build();
   }
 
-  BigtableConfig withInstanceId(String instanceId) {
-    checkNotNull(instanceId, "Instance Id of BigTable can not be null");
+  BigtableConfig withInstanceId(ValueProvider<String> instanceId) {
+    checkArgument(instanceId != null, "Instance Id of BigTable can not be 
null");
     return toBuilder().setInstanceId(instanceId).build();
   }
 
-  BigtableConfig withTableId(String tableId) {
-    checkNotNull(tableId, "tableId can not be null");
+  BigtableConfig withTableId(ValueProvider<String> tableId) {
+    checkArgument(tableId != null, "tableId can not be null");
     return toBuilder().setTableId(tableId).build();
   }
 
@@ -135,13 +135,13 @@ abstract class BigtableConfig implements Serializable {
    */
   @Deprecated
   BigtableConfig withBigtableOptions(BigtableOptions options) {
-    checkNotNull(options, "Bigtable options can not be null");
+    checkArgument(options != null, "Bigtable options can not be null");
     return toBuilder().setBigtableOptions(options).build();
   }
 
   BigtableConfig withBigtableOptionsConfigurator(
     SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
-    checkNotNull(configurator, "configurator can not be null");
+    checkArgument(configurator != null, "configurator can not be null");
     return toBuilder().setBigtableOptionsConfigurator(configurator).build();
   }
 
@@ -151,47 +151,40 @@ abstract class BigtableConfig implements Serializable {
 
   @VisibleForTesting
   BigtableConfig withBigtableService(BigtableService bigtableService) {
-    checkNotNull(bigtableService, "bigtableService can not be null");
+    checkArgument(bigtableService != null, "bigtableService can not be null");
     return toBuilder().setBigtableService(bigtableService).build();
   }
 
   void validate() {
-    checkArgument(getProjectId() != null && !getProjectId().isEmpty()
+    checkArgument(getTableId() != null
+        && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
+      "Could not obtain Bigtable table id");
+
+    checkArgument(getProjectId() != null
+        && (!getProjectId().isAccessible() || !getProjectId().get().isEmpty())
         || getBigtableOptions() != null && getBigtableOptions().getProjectId() 
!= null
         && !getBigtableOptions().getProjectId().isEmpty(),
       "Could not obtain Bigtable project id");
 
-    checkArgument(getInstanceId() != null && !getInstanceId().isEmpty()
+    checkArgument(getInstanceId() != null
+        && (!getInstanceId().isAccessible() || 
!getInstanceId().get().isEmpty())
         || getBigtableOptions() != null && 
getBigtableOptions().getInstanceId() != null
         && !getBigtableOptions().getInstanceId().isEmpty(),
       "Could not obtain Bigtable instance id");
-
-    checkArgument(getTableId() != null && !getTableId().isEmpty(),
-      "Could not obtain Bigtable table id");
   }
 
   void populateDisplayData(DisplayData.Builder builder) {
-    builder.add(DisplayData.item("tableId", getTableId())
-      .withLabel("Table ID"));
+    builder
+      .addIfNotNull(DisplayData.item("projectId", 
getProjectId()).withLabel("Bigtable Project Id"))
+      .addIfNotNull(DisplayData.item("instanceId", getInstanceId())
+        .withLabel("Bigtable Instance Id"))
+      .addIfNotNull(DisplayData.item("tableId", 
getTableId()).withLabel("Bigtable Table Id"))
+      .add(DisplayData.item("withValidation", getValidate()).withLabel("Check 
is table exists"));
 
     if (getBigtableOptions() != null) {
       builder.add(DisplayData.item("bigtableOptions", 
getBigtableOptions().toString())
         .withLabel("Bigtable Options"));
     }
-
-    if (getProjectId() != null) {
-      builder.add(DisplayData.item("projectId", getProjectId())
-        .withLabel("Bigtable Project Id"));
-    }
-
-    if (getInstanceId() != null) {
-      builder.add(DisplayData.item("instanceId", getInstanceId())
-        .withLabel("Bigtable Instnace Id"));
-    }
-
-    builder.add(DisplayData.item("effectiveBigtableOptions",
-      effectiveUserProvidedBigtableOptions().build().toString())
-      .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
   }
 
   /**
@@ -225,18 +218,10 @@ abstract class BigtableConfig implements Serializable {
     return new BigtableServiceImpl(bigtableOptions.build());
   }
 
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(BigtableConfig.class)
-      .add("projectId", getProjectId())
-      .add("instanceId", getInstanceId())
-      .add("tableId", getTableId())
-      .add("bigtableOptionsConfigurator",
-        getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
-          .getClass().getName())
-      .add("options", getBigtableOptions())
-      .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
-      .toString();
+  boolean isDataAccessible() {
+    return getTableId().isAccessible()
+      && (getProjectId() == null || getProjectId().isAccessible())
+      && (getInstanceId() == null || getInstanceId().isAccessible());
   }
 
   private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
@@ -249,13 +234,26 @@ abstract class BigtableConfig implements Serializable {
     }
 
     if (getInstanceId() != null) {
-      effectiveOptions.setInstanceId(getInstanceId());
+      effectiveOptions.setInstanceId(getInstanceId().get());
     }
 
     if (getProjectId() != null) {
-      effectiveOptions.setProjectId(getProjectId());
+      effectiveOptions.setProjectId(getProjectId().get());
     }
 
     return effectiveOptions;
   }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(BigtableConfig.class)
+      .add("projectId", getProjectId())
+      .add("instanceId", getInstanceId())
+      .add("tableId", getTableId())
+      .add("bigtableOptionsConfigurator",
+        getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
+          .getClass().getName())
+      .add("options", getBigtableOptions())
+      .toString();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 71c0415..4278f53 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -28,6 +28,7 @@ import com.google.bigtable.v2.Row;
 import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.ImmutableList;
@@ -54,6 +55,7 @@ import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -151,11 +153,11 @@ public class BigtableIO {
 
   /**
    * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code 
Read} must be
-   * initialized with a
-   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) 
BigtableOptions} that specifies
-   * the source Cloud Bigtable instance, and a {@link 
BigtableIO.Read#withTableId tableId} that
-   * specifies which table to read. A {@link RowFilter} may also optionally be 
specified using
-   * {@link BigtableIO.Read#withRowFilter}.
+   * initialized with a {@link BigtableIO.Read#withInstanceId} and
+   * {@link BigtableIO.Read#withProjectId} that specifies the source Cloud 
Bigtable
+   * instance, and a {@link BigtableIO.Read#withTableId} that specifies which 
table to
+   * read. A {@link RowFilter} may also optionally be specified using
+   * {@link BigtableIO.Read#withRowFilter(RowFilter)}.
    */
   @Experimental
   public static Read read() {
@@ -164,10 +166,10 @@ public class BigtableIO {
 
   /**
    * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code 
Write} must be
-   * initialized with a
-   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) 
BigtableOptions} that specifies
-   * the destination Cloud Bigtable instance, and a {@link 
BigtableIO.Write#withTableId tableId}
-   * that specifies which table to write.
+   * initialized with a {@link BigtableIO.Write#withProjectId} and
+   * {@link BigtableIO.Write#withInstanceId} that specifies the destination 
Cloud
+   * Bigtable instance, and a {@link BigtableIO.Write#withTableId} that 
specifies
+   * which table to write.
    */
   @Experimental
   public static Write write() {
@@ -196,7 +198,8 @@ public class BigtableIO {
     /** Returns the table being read from. */
     @Nullable
     public String getTableId() {
-      return getBigtableConfig().getTableId();
+      ValueProvider<String> tableId = getBigtableConfig().getTableId();
+      return tableId != null && tableId.isAccessible() ? tableId.get() : null;
     }
 
     /**
@@ -213,7 +216,7 @@ public class BigtableIO {
 
     static Read create() {
       BigtableConfig config = BigtableConfig.builder()
-        .setTableId("")
+        .setTableId(ValueProvider.StaticValueProvider.of(""))
         .setValidate(true)
         .build();
 
@@ -237,35 +240,76 @@ public class BigtableIO {
 
     /**
      * Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable project
-     * indicated by given parameter, requires {@link #withInstanceId(String)} 
to be called to
+     * indicated by given parameter, requires {@link #withInstanceId} to be 
called to
      * determine the instance.
      *
      * <p>Does not modify this object.
      */
-    public Read withProjectId(String projectId) {
+    public Read withProjectId(ValueProvider<String> projectId) {
       BigtableConfig config = getBigtableConfig();
       return 
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
     }
 
     /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable project
+     * indicated by given parameter, requires {@link #withInstanceId} to be 
called to
+     * determine the instance.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withProjectId(String projectId) {
+      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+    }
+
+    /**
      * Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable instance
-     * indicated by given parameter, requires {@link #withProjectId(String)} 
to be called to
+     * indicated by given parameter, requires {@link #withProjectId} to be 
called to
      * determine the project.
      *
      * <p>Does not modify this object.
      */
-    public Read withInstanceId(String instanceId) {
+    public Read withInstanceId(ValueProvider<String> instanceId) {
       BigtableConfig config = getBigtableConfig();
       return 
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
     }
 
     /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable instance
+     * indicated by given parameter, requires {@link #withProjectId} to be 
called to
+     * determine the project.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withInstanceId(String instanceId) {
+      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the specified 
table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withTableId(ValueProvider<String> tableId) {
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the specified 
table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withTableId(String tableId) {
+      return withTableId(ValueProvider.StaticValueProvider.of(tableId));
+    }
+
+    /**
      * WARNING: Should be used only to specify additional parameters for 
connection
      * to the Cloud Bigtable, instanceId and projectId should be provided over
-     * {@link #withInstanceId(String)} and {@link #withProjectId(String)} 
respectively.
+     * {@link #withInstanceId} and {@link #withProjectId} respectively.
      *
      * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable instance
-     * indicated by {@link #withProjectId(String)}, and using any other 
specified customizations.
+     * indicated by {@link #withProjectId}, and using any other specified 
customizations.
      *
      * <p>Does not modify this object.
      *
@@ -280,7 +324,7 @@ public class BigtableIO {
     /**
      * WARNING: Should be used only to specify additional parameters for 
connection to
      * the Cloud Bigtable, instanceId and projectId should be provided over
-     * {@link #withInstanceId(String)} and {@link #withProjectId(String)} 
respectively.
+     * {@link #withInstanceId} and {@link #withProjectId} respectively.
      *
      * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable instance
      * indicated by the given options, and using any other specified 
customizations.
@@ -306,7 +350,7 @@ public class BigtableIO {
      * with customized options provided by given configurator.
      *
      * <p>WARNING: instanceId and projectId should not be provided here and 
should be provided over
-     * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+     * {@link #withProjectId} and {@link #withInstanceId}.
      *
      * <p>Does not modify this object.
      */
@@ -354,16 +398,6 @@ public class BigtableIO {
       return toBuilder().setKeyRanges(keyRanges).build();
     }
 
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the specified 
table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withTableId(String tableId) {
-      BigtableConfig config = getBigtableConfig();
-      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
-    }
-
     /** Disables validation that the table being read from exists. */
     public Read withoutValidation() {
       BigtableConfig config = getBigtableConfig();
@@ -378,6 +412,7 @@ public class BigtableIO {
      *
      * <p>Does not modify this object.
      */
+    @VisibleForTesting
     Read withBigtableService(BigtableService bigtableService) {
       BigtableConfig config = getBigtableConfig();
       return 
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
@@ -388,12 +423,7 @@ public class BigtableIO {
       getBigtableConfig().validate();
 
       BigtableSource source =
-          new BigtableSource(new SerializableFunction<PipelineOptions, 
BigtableService>() {
-            @Override
-            public BigtableService apply(PipelineOptions options) {
-              return getBigtableConfig().getBigtableService(options);
-            }
-          }, getTableId(), getRowFilter(), getKeyRanges(), null);
+          new BigtableSource(getBigtableConfig(), getRowFilter(), 
getKeyRanges(), null);
       return 
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
     }
 
@@ -476,7 +506,7 @@ public class BigtableIO {
 
     static Write create() {
       BigtableConfig config = BigtableConfig.builder()
-        .setTableId("")
+        .setTableId(ValueProvider.StaticValueProvider.of(""))
         .setValidate(true)
         .setBigtableOptionsConfigurator(enableBulkApiConfigurator(null))
         .build();
@@ -494,32 +524,73 @@ public class BigtableIO {
 
     /**
      * Returns a new {@link BigtableIO.Read} that will write into the Cloud 
Bigtable project
-     * indicated by given parameter, requires {@link #withInstanceId(String)}
+     * indicated by given parameter, requires {@link #withInstanceId}
      * to be called to determine the instance.
      *
      * <p>Does not modify this object.
      */
-    public Write withProjectId(String projectId) {
+    public Write withProjectId(ValueProvider<String> projectId) {
       BigtableConfig config = getBigtableConfig();
       return 
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
     }
 
     /**
+     * Returns a new {@link BigtableIO.Read} that will write into the Cloud 
Bigtable project
+     * indicated by given parameter, requires {@link #withInstanceId}
+     * to be called to determine the instance.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withProjectId(String projectId) {
+      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+    }
+
+    /**
      * Returns a new {@link BigtableIO.Read} that will write into the Cloud 
Bigtable instance
-     * indicated by given parameter, requires {@link #withProjectId(String)} 
to be called to
+     * indicated by given parameter, requires {@link #withProjectId} to be 
called to
      * determine the project.
      *
      * <p>Does not modify this object.
      */
-    public Write withInstanceId(String instanceId) {
+    public Write withInstanceId(ValueProvider<String> instanceId) {
       BigtableConfig config = getBigtableConfig();
       return 
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
     }
 
     /**
+     * Returns a new {@link BigtableIO.Read} that will write into the Cloud 
Bigtable instance
+     * indicated by given parameter, requires {@link #withProjectId} to be 
called to
+     * determine the project.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withInstanceId(String instanceId) {
+      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the specified 
table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableId(ValueProvider<String> tableId) {
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the specified 
table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableId(String tableId) {
+      return withTableId(ValueProvider.StaticValueProvider.of(tableId));
+    }
+
+    /**
      * WARNING: Should be used only to specify additional parameters for 
connection to
      * the Cloud Bigtable, instanceId and projectId should be provided over
-     * {@link #withInstanceId(String)} and {@link #withProjectId(String)} 
respectively.
+     * {@link #withInstanceId} and {@link #withProjectId} respectively.
      *
      * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud 
Bigtable instance
      * indicated by the given options, and using any other specified 
customizations.
@@ -537,7 +608,7 @@ public class BigtableIO {
     /**
      * WARNING: Should be used only to specify additional parameters for 
connection
      * to the Cloud Bigtable, instanceId and projectId should be provided over
-     * {@link #withInstanceId(String)} and {@link #withProjectId(String)} 
respectively.
+     * {@link #withInstanceId} and {@link #withProjectId} respectively.
      *
      * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud 
Bigtable instance
      * indicated by the given options, and using any other specified 
customizations.
@@ -563,7 +634,7 @@ public class BigtableIO {
      * with customized options provided by given configurator.
      *
      * <p>WARNING: instanceId and projectId should not be provided here and 
should be provided over
-     * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+     * {@link #withProjectId} and {@link #withInstanceId}.
      *
      * <p>Does not modify this object.
      */
@@ -583,16 +654,6 @@ public class BigtableIO {
     }
 
     /**
-     * Returns a new {@link BigtableIO.Write} that will write to the specified 
table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableId(String tableId) {
-      BigtableConfig config = getBigtableConfig();
-      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
-    }
-
-    /**
      * Returns a new {@link BigtableIO.Write} that will write using the given 
Cloud Bigtable
      * service implementation.
      *
@@ -609,13 +670,7 @@ public class BigtableIO {
     public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) 
{
       getBigtableConfig().validate();
 
-      input.apply(ParDo.of(new 
BigtableWriterFn(getBigtableConfig().getTableId(),
-          new SerializableFunction<PipelineOptions, BigtableService>() {
-            @Override
-            public BigtableService apply(PipelineOptions options) {
-              return getBigtableConfig().getBigtableService(options);
-            }
-          })));
+      input.apply(ParDo.of(new BigtableWriterFn(getBigtableConfig())));
       return PDone.in(input.getPipeline());
     }
 
@@ -639,19 +694,16 @@ public class BigtableIO {
 
     private class BigtableWriterFn extends DoFn<KV<ByteString, 
Iterable<Mutation>>, Void> {
 
-      public BigtableWriterFn(String tableId,
-          SerializableFunction<PipelineOptions, BigtableService> 
bigtableServiceFactory) {
-        this.tableId = checkNotNull(tableId, "tableId");
-        this.bigtableServiceFactory =
-            checkNotNull(bigtableServiceFactory, "bigtableServiceFactory");
+      public BigtableWriterFn(BigtableConfig bigtableConfig) {
+        this.config = bigtableConfig;
         this.failures = new ConcurrentLinkedQueue<>();
       }
 
       @StartBundle
       public void startBundle(StartBundleContext c) throws IOException {
         if (bigtableWriter == null) {
-          bigtableWriter = bigtableServiceFactory.apply(
-              c.getPipelineOptions()).openForWriting(tableId);
+          bigtableWriter = config.getBigtableService(
+              
c.getPipelineOptions()).openForWriting(config.getTableId().get());
         }
         recordsWritten = 0;
       }
@@ -687,8 +739,7 @@ public class BigtableIO {
       }
 
       
///////////////////////////////////////////////////////////////////////////////
-      private final String tableId;
-      private final SerializableFunction<PipelineOptions, BigtableService> 
bigtableServiceFactory;
+      private final BigtableConfig config;
       private BigtableService.Writer bigtableWriter;
       private long recordsWritten;
       private final ConcurrentLinkedQueue<BigtableWriteException> failures;
@@ -756,13 +807,11 @@ public class BigtableIO {
 
   static class BigtableSource extends BoundedSource<Row> {
     public BigtableSource(
-        SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
-        String tableId,
+        BigtableConfig config,
         @Nullable RowFilter filter,
         List<ByteKeyRange> ranges,
         @Nullable Long estimatedSizeBytes) {
-      this.serviceFactory = serviceFactory;
-      this.tableId = tableId;
+      this.config = config;
       this.filter = filter;
       this.ranges = ranges;
       this.estimatedSizeBytes = estimatedSizeBytes;
@@ -771,7 +820,7 @@ public class BigtableIO {
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(BigtableSource.class)
-          .add("tableId", tableId)
+          .add("config", config)
           .add("filter", filter)
           .add("ranges", ranges)
           .add("estimatedSizeBytes", estimatedSizeBytes)
@@ -779,8 +828,7 @@ public class BigtableIO {
     }
 
     ////// Private state and internal implementation details //////
-    private final SerializableFunction<PipelineOptions, BigtableService> 
serviceFactory;
-    private final String tableId;
+    private final BigtableConfig config;
     @Nullable private final RowFilter filter;
     private final List<ByteKeyRange> ranges;
     @Nullable private Long estimatedSizeBytes;
@@ -791,13 +839,13 @@ public class BigtableIO {
      */
     protected BigtableSource withSingleRange(ByteKeyRange range) {
       checkArgument(range != null, "range can not be null");
-      return new BigtableSource(serviceFactory, tableId, filter,
+      return new BigtableSource(config, filter,
           Arrays.asList(range), estimatedSizeBytes);
     }
 
     protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
       checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be 
null");
-      return new BigtableSource(serviceFactory, tableId, filter, ranges, 
estimatedSizeBytes);
+      return new BigtableSource(config, filter, ranges, estimatedSizeBytes);
     }
 
     /**
@@ -807,7 +855,7 @@ public class BigtableIO {
      */
     private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions 
pipelineOptions)
         throws IOException {
-      return serviceFactory.apply(pipelineOptions).getSampleRowKeys(this);
+      return config.getBigtableService(pipelineOptions).getSampleRowKeys(this);
     }
 
     @Override
@@ -960,19 +1008,21 @@ public class BigtableIO {
 
     @Override
     public BoundedReader<Row> createReader(PipelineOptions options) throws 
IOException {
-      return new BigtableReader(this, serviceFactory.apply(options));
+      return new BigtableReader(this, config.getBigtableService(options));
     }
 
     @Override
     public void validate() {
-      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
+      ValueProvider<String> tableId = config.getTableId();
+      checkArgument(tableId != null && tableId.isAccessible() && 
!tableId.get().isEmpty(),
+        "tableId was not supplied");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder.add(DisplayData.item("tableId", tableId)
+      builder.add(DisplayData.item("tableId", config.getTableId().get())
           .withLabel("Table ID"));
 
       if (filter != null) {
@@ -1030,8 +1080,8 @@ public class BigtableIO {
       return filter;
     }
 
-    public String getTableId() {
-      return tableId;
+    public ValueProvider<String> getTableId() {
+      return config.getTableId();
     }
   }
 
@@ -1156,8 +1206,8 @@ public class BigtableIO {
   }
 
   static void validateTableExists(BigtableConfig config, PipelineOptions 
options) {
-    if (config.getValidate()) {
-      String tableId = config.getTableId();
+    if (config.getValidate() && config.isDataAccessible()) {
+      String tableId = checkNotNull(config.getTableId().get());
       try {
         checkArgument(
           config.getBigtableService(options).tableExists(tableId),
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 06c459b..af756a8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -119,7 +119,7 @@ class BigtableServiceImpl implements BigtableService {
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder()
               .setRows(rowSet)
-              
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId()));
+              
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId().get()));
       if (source.getRowFilter() != null) {
         requestB.setFilter(source.getRowFilter());
       }
@@ -249,7 +249,7 @@ class BigtableServiceImpl implements BigtableService {
     try (BigtableSession session = new BigtableSession(options)) {
       SampleRowKeysRequest request =
           SampleRowKeysRequest.newBuilder()
-              
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId()))
+              
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId().get()))
               .build();
       return session.getDataClient().sampleRowKeys(request);
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
new file mode 100644
index 0000000..0f44f8c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static org.hamcrest.Matchers.allOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link BigtableConfig}.
+ */
+@RunWith(JUnit4.class)
+public class BigtableConfigTest {
+
+  static final ValueProvider<String> NOT_ACCESSIBLE_VALUE = new 
ValueProvider<String>() {
+    @Override
+    public String get() {
+      throw new IllegalStateException("Value is not accessible");
+    }
+
+    @Override
+    public boolean isAccessible() {
+      return false;
+    }
+  };
+
+  static final ValueProvider<String> PROJECT_ID =
+    ValueProvider.StaticValueProvider.of("project_id");
+
+  static final ValueProvider<String> INSTANCE_ID =
+    ValueProvider.StaticValueProvider.of("instance_id");
+
+  static final ValueProvider<String> TABLE_ID =  
ValueProvider.StaticValueProvider.of("table");
+
+  static final
+  SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
CONFIGURATOR =
+    new SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>() {
+      @Override
+      public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+        return input;
+      }
+    };
+
+  static final BigtableService SERVICE = Mockito.mock(BigtableService.class);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private BigtableConfig config;
+
+  @Before
+  public void setup() throws Exception {
+    config = BigtableConfig.builder().setValidate(false).build();
+  }
+
+  @Test
+  public void testWithProjectId() {
+    assertEquals(PROJECT_ID.get(), 
config.withProjectId(PROJECT_ID).getProjectId().get());
+
+    thrown.expect(IllegalArgumentException.class);
+    config.withProjectId(null);
+  }
+
+  @Test
+  public void testWithInstanceId() {
+    assertEquals(INSTANCE_ID.get(), 
config.withInstanceId(INSTANCE_ID).getInstanceId().get());
+
+    thrown.expect(IllegalArgumentException.class);
+    config.withInstanceId(null);
+  }
+
+  @Test
+  public void testWithTableId() {
+    assertEquals(TABLE_ID.get(), 
config.withTableId(TABLE_ID).getTableId().get());
+
+    thrown.expect(IllegalArgumentException.class);
+    config.withTableId(null);
+  }
+
+  @Test
+  public void testWithBigtableOptionsConfigurator() {
+    assertEquals(CONFIGURATOR,
+      
config.withBigtableOptionsConfigurator(CONFIGURATOR).getBigtableOptionsConfigurator());
+
+    thrown.expect(IllegalArgumentException.class);
+    config.withBigtableOptionsConfigurator(null);
+  }
+
+  @Test
+  public void testWithValidate() {
+    assertEquals(true, config.withValidate(true).getValidate());
+  }
+
+  @Test
+  public void testWithBigtableService() {
+    assertEquals(SERVICE, 
config.withBigtableService(SERVICE).getBigtableService());
+
+    thrown.expect(IllegalArgumentException.class);
+    config.withBigtableService(null);
+  }
+
+  @Test
+  public void testValidate() {
+    config.withProjectId(PROJECT_ID)
+      .withInstanceId(INSTANCE_ID)
+      .withTableId(TABLE_ID)
+      .validate();
+  }
+
+  @Test
+  public void testValidateFailsWithoutProjectId() {
+    config.withInstanceId(INSTANCE_ID)
+      .withTableId(TABLE_ID);
+
+    thrown.expect(IllegalArgumentException.class);
+    config.validate();
+  }
+
+  @Test
+  public void testValidateFailsWithoutInstanceId() {
+    config.withProjectId(PROJECT_ID)
+      .withTableId(TABLE_ID);
+
+    thrown.expect(IllegalArgumentException.class);
+    config.validate();
+  }
+
+  @Test
+  public void testValidateFailsWithoutTableId() {
+    config.withProjectId(PROJECT_ID)
+      .withInstanceId(INSTANCE_ID);
+
+    thrown.expect(IllegalArgumentException.class);
+    config.validate();
+  }
+
+  @Test
+  public void testPopulateDisplayData() {
+    DisplayData displayData = DisplayData.from(new HasDisplayData() {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        config.withProjectId(PROJECT_ID)
+          .withInstanceId(INSTANCE_ID)
+          .withTableId(TABLE_ID)
+          .populateDisplayData(builder);
+      }
+    });
+
+    assertThat(displayData, hasDisplayItem(allOf(
+      hasKey("projectId"),
+      hasLabel("Bigtable Project Id"),
+      hasValue(PROJECT_ID.get()))));
+
+    assertThat(displayData, hasDisplayItem(allOf(
+      hasKey("instanceId"),
+      hasLabel("Bigtable Instance Id"),
+      hasValue(INSTANCE_ID.get()))));
+
+    assertThat(displayData, hasDisplayItem(allOf(
+      hasKey("tableId"),
+      hasLabel("Bigtable Table Id"),
+      hasValue(TABLE_ID.get()))));
+  }
+
+  @Test
+  public void testGetBigtableServiceWithDefaultService() {
+    assertEquals(SERVICE, 
config.withBigtableService(SERVICE).getBigtableService());
+  }
+
+  @Test
+  public void testGetBigtableServiceWithConfigurator() {
+    SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator =
+      new SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>() {
+        @Override
+        public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+          return input
+            .setInstanceId(INSTANCE_ID.get() + INSTANCE_ID.get())
+            .setProjectId(PROJECT_ID.get() + PROJECT_ID.get())
+            .setBulkOptions(new 
BulkOptions.Builder().setUseBulkApi(true).build());
+        }
+      };
+
+    BigtableService service = config
+      .withProjectId(PROJECT_ID)
+      .withInstanceId(INSTANCE_ID)
+      .withBigtableOptionsConfigurator(configurator)
+      .getBigtableService(PipelineOptionsFactory.as(GcpOptions.class));
+
+    assertEquals(PROJECT_ID.get(), 
service.getBigtableOptions().getProjectId());
+    assertEquals(INSTANCE_ID.get(), 
service.getBigtableOptions().getInstanceId());
+    assertEquals(true, 
service.getBigtableOptions().getBulkOptions().useBulkApi());
+  }
+
+  @Test
+  public void testIsDataAccessible() {
+    
assertTrue(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID).withInstanceId(INSTANCE_ID)
+      .isDataAccessible());
+    assertTrue(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID)
+      .withBigtableOptions(new 
BigtableOptions.Builder().setInstanceId("instance_id").build())
+      .isDataAccessible());
+    assertTrue(config.withTableId(TABLE_ID).withInstanceId(INSTANCE_ID)
+      .withBigtableOptions(new 
BigtableOptions.Builder().setProjectId("project_id").build())
+      .isDataAccessible());
+    assertTrue(config.withTableId(TABLE_ID).withBigtableOptions(
+      new 
BigtableOptions.Builder().setProjectId("project_id").setInstanceId("instance_id").build())
+      .isDataAccessible());
+
+    
assertFalse(config.withTableId(NOT_ACCESSIBLE_VALUE).withProjectId(PROJECT_ID)
+      .withInstanceId(INSTANCE_ID).isDataAccessible());
+    
assertFalse(config.withTableId(TABLE_ID).withProjectId(NOT_ACCESSIBLE_VALUE)
+      .withInstanceId(INSTANCE_ID).isDataAccessible());
+    assertFalse(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID)
+      .withInstanceId(NOT_ACCESSIBLE_VALUE).isDataAccessible());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 4b878a4..fe894ca 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -86,8 +86,8 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -115,14 +115,20 @@ public class BigtableIOTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
 
-  private static FakeBigtableService service;
-  private static SerializableFunction<PipelineOptions, BigtableService> 
serviceFactory =
-      new SerializableFunction<PipelineOptions, BigtableService>() {
-        @Override
-        public BigtableService apply(PipelineOptions input) {
-          return service;
-        }
+  static final ValueProvider<String> NOT_ACCESSIBLE_VALUE = new 
ValueProvider<String>() {
+    @Override
+    public String get() {
+      throw new IllegalStateException("Value is not accessible");
+    }
+
+    @Override
+    public boolean isAccessible() {
+      return false;
+    }
   };
+
+  private static BigtableConfig config;
+  private static FakeBigtableService service;
   private static final BigtableOptions BIGTABLE_OPTIONS =
       new BigtableOptions.Builder()
           .setProjectId("options_project")
@@ -151,6 +157,11 @@ public class BigtableIOTest {
     defaultRead = defaultRead.withBigtableService(service);
     defaultWrite = defaultWrite.withBigtableService(service);
     bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+
+    config = BigtableConfig.builder()
+      .setValidate(true)
+      .setBigtableService(service)
+      .build();
   }
 
   private static ByteKey makeByteKey(ByteString key) {
@@ -167,8 +178,8 @@ public class BigtableIOTest {
             .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
     assertEquals("options_project", read.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
read.getBigtableOptions().getInstanceId());
-    assertEquals("instance", read.getBigtableConfig().getInstanceId());
-    assertEquals("project", read.getBigtableConfig().getProjectId());
+    assertEquals("instance", read.getBigtableConfig().getInstanceId().get());
+    assertEquals("project", read.getBigtableConfig().getProjectId().get());
     assertEquals("table", read.getTableId());
     assertEquals(PORT_CONFIGURATOR, 
read.getBigtableConfig().getBigtableOptionsConfigurator());
   }
@@ -178,7 +189,6 @@ public class BigtableIOTest {
     BigtableIO.Read read = 
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
 
     thrown.expect(IllegalArgumentException.class);
-
     read.expand(null);
   }
 
@@ -222,11 +232,11 @@ public class BigtableIOTest {
             .withTableId("table")
             .withInstanceId("instance")
             .withProjectId("project");
-    assertEquals("table", write.getBigtableConfig().getTableId());
+    assertEquals("table", write.getBigtableConfig().getTableId().get());
     assertEquals("options_project", write.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
write.getBigtableOptions().getInstanceId());
-    assertEquals("instance", write.getBigtableConfig().getInstanceId());
-    assertEquals("project", write.getBigtableConfig().getProjectId());
+    assertEquals("instance", write.getBigtableConfig().getInstanceId().get());
+    assertEquals("project", write.getBigtableConfig().getProjectId().get());
   }
 
   @Test
@@ -532,12 +542,8 @@ public class BigtableIOTest {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(
-            serviceFactory,
-            table,
-            null,
-            Arrays.asList(service.getTableRange(table)),
-            null);
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)), 
null,
+          Arrays.asList(service.getTableRange(table)), null);
     assertSplitAtFractionExhaustive(source, null);
   }
 
@@ -554,11 +560,8 @@ public class BigtableIOTest {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(serviceFactory,
-            table,
-            null,
-            Arrays.asList(service.getTableRange(table)),
-            null);
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+          null, Arrays.asList(service.getTableRange(table)), null);
     // With 0 items read, all split requests will fail.
     assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
     assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
@@ -588,8 +591,7 @@ public class BigtableIOTest {
 
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(serviceFactory,
-            table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
             null /*filter*/,
             Arrays.asList(ByteKeyRange.ALL_KEYS),
             null /*size*/);
@@ -626,14 +628,12 @@ public class BigtableIOTest {
         tableRange.withStartKey(splitKey2));
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(serviceFactory,
-            table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
             null /*filter*/,
             keyRanges,
             null /*size*/);
     BigtableSource referenceSource =
-        new BigtableSource(serviceFactory,
-            table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
             null /*filter*/,
             ImmutableList.of(service.getTableRange(table)),
             null /*size*/);
@@ -660,8 +660,7 @@ public class BigtableIOTest {
 
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(serviceFactory,
-        table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
         null /*filter*/,
         Arrays.asList(ByteKeyRange.ALL_KEYS),
         null /*size*/);
@@ -702,14 +701,12 @@ public class BigtableIOTest {
         tableRange.withStartKey(splitKey2));
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(serviceFactory,
-            table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
             null /*filter*/,
             keyRanges,
             null /*size*/);
     BigtableSource referenceSource =
-        new BigtableSource(serviceFactory,
-            table,
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
             null /*filter*/,
             ImmutableList.of(service.getTableRange(table)),
             null /*size*/);
@@ -737,12 +734,8 @@ public class BigtableIOTest {
     RowFilter filter =
         
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
     BigtableSource source =
-        new BigtableSource(
-            serviceFactory,
-            table,
-            filter,
-            Arrays.asList(ByteKeyRange.ALL_KEYS),
-            null /*size*/);
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+          filter, Arrays.asList(ByteKeyRange.ALL_KEYS), null /*size*/);
     List<BigtableSource> splits = source.split(numRows * bytesPerRow / 
numSplits, null);
 
     // Test num splits and split equality.
@@ -767,7 +760,7 @@ public class BigtableIOTest {
 
     assertThat(displayData, hasDisplayItem(allOf(
         hasKey("tableId"),
-        hasLabel("Table ID"),
+        hasLabel("Bigtable Table Id"),
         hasValue("fooTable"))));
 
     assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
@@ -867,6 +860,18 @@ public class BigtableIOTest {
     p.run();
   }
 
+  /** Tests that when writing to a non-existent table, the write fails. */
+  @Test
+  public void testTableCheckIgnoredWhenCanNotAccessConfig() throws Exception {
+    PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+      p.apply(
+        Create.empty(
+          KvCoder.of(ByteStringCoder.of(), 
IterableCoder.of(ProtoCoder.of(Mutation.class)))));
+
+    emptyInput.apply("write", defaultWrite.withTableId(NOT_ACCESSIBLE_VALUE));
+    p.run();
+  }
+
   /** Tests that when writing an element fails, the write fails. */
   @Test
   public void testWritingFailsBadElement() throws Exception {
@@ -903,7 +908,8 @@ public class BigtableIOTest {
     makeTableData(table, numRows);
 
     BigtableSource source =
-        new BigtableSource(serviceFactory, table, null, 
Arrays.asList(ByteKeyRange.ALL_KEYS), null);
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+          null, Arrays.asList(ByteKeyRange.ALL_KEYS), null);
 
     BoundedReader<Row> reader = 
source.createReader(TestPipeline.testingPipelineOptions());
 
@@ -1058,8 +1064,8 @@ public class BigtableIOTest {
 
     @Override
     public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) 
{
-      List<SampleRowKeysResponse> samples = 
sampleRowKeys.get(source.getTableId());
-      checkNotNull(samples, "No samples found for table %s", 
source.getTableId());
+      List<SampleRowKeysResponse> samples = 
sampleRowKeys.get(source.getTableId().get());
+      checkNotNull(samples, "No samples found for table %s", 
source.getTableId().get());
       return samples;
     }
 
@@ -1114,12 +1120,12 @@ public class BigtableIOTest {
         checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is 
supported");
         filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
       }
-      service.verifyTableExists(source.getTableId());
+      service.verifyTableExists(source.getTableId().get());
     }
 
     @Override
     public boolean start() {
-      rows = service.tables.get(source.getTableId()).entrySet().iterator();
+      rows = 
service.tables.get(source.getTableId().get()).entrySet().iterator();
       return advance();
     }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to