[ 
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286771#comment-16286771
 ] 

ASF GitHub Bot commented on BEAM-3008:
--------------------------------------

chamikaramj closed pull request #4205: [BEAM-3008] Adds BigtableOptions 
configurator to the BigtableIO
URL: https://github.com/apache/beam/pull/4205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 8b4609da224..febdc1f53b0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -122,43 +122,19 @@
  * idempotent transformation to that row.
  *
  * <p>To configure a Cloud Bigtable sink, you must supply a table id, a 
project id, an instance id
- * and optionally and optionally a {@link BigtableOptions} to provide more 
specific connection
- * configuration, for example:
+ * and optionally a configuration function for {@link BigtableOptions} to 
provide more specific
+ * connection configuration, for example:
  *
  * <pre>{@code
  * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
  *
  * data.apply("write",
  *     BigtableIO.write()
- *         .setProjectId("project")
- *         .setInstanceId("instance")
+ *         .withProjectId("project")
+ *         .withInstanceId("instance")
  *         .withTableId("table"));
  * }</pre>
  *
- * <h3>Using local emulator</h3>
- *
- * <p>In order to use local emulator for Bigtable you should use:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setUsePlaintextNegotiation(true)
- *         .setCredentialOptions(CredentialOptions.nullCredential())
- *         .setDataHost("127.0.0.1") // network interface where Bigtable 
emulator is bound
- *         .setInstanceAdminHost("127.0.0.1")
- *         .setTableAdminHost("127.0.0.1")
- *         .setPort(LOCAL_EMULATOR_PORT))
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- *     BigtableIO.write()
- *         .withBigtableOptions(optionsBuilder)
- *         .setProjectId("project")
- *         .setInstanceId("instance")
- *         .withTableId("table");
- * }</pre>
- *
  * <h3>Experimental</h3>
  *
  * <p>This connector for Cloud Bigtable is considered experimental and may 
break or receive
@@ -239,12 +215,23 @@ public static Write write() {
     @Nullable
     abstract BigtableService getBigtableService();
 
-    /** Returns the Google Cloud Bigtable instance being read from, and other 
parameters. */
+    /**
+     * Returns the Google Cloud Bigtable instance being read from, and other 
parameters.
+     * @deprecated will be replaced by bigtable options configurator.
+     */
+    @Deprecated
     @Nullable
     public abstract BigtableOptions getBigtableOptions();
 
     public abstract boolean getValidate();
 
+    /**
+     * Configurator of the effective Bigtable Options.
+     */
+    @Nullable
+    abstract SerializableFunction<BigtableOptions.Builder,
+      BigtableOptions.Builder> getBigtableOptionsConfigurator();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -260,12 +247,17 @@ public static Write write() {
 
       abstract Builder setTableId(String tableId);
 
+      /** @deprecated will be replaced by bigtable options configurator. */
+      @Deprecated
       abstract Builder setBigtableOptions(BigtableOptions options);
 
       abstract Builder setBigtableService(BigtableService bigtableService);
 
       abstract Builder setValidate(boolean validate);
 
+      abstract Builder setBigtableOptionsConfigurator(
+        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
optionsConfigurator);
+
       abstract Read build();
     }
 
@@ -302,7 +294,10 @@ public Read withInstanceId(String instanceId) {
      * indicated by {@link #withProjectId(String)}, and using any other 
specified customizations.
      *
      * <p>Does not modify this object.
+     *
+     * @deprecated will be replaced by bigtable options configurator.
      */
+    @Deprecated
     public Read withBigtableOptions(BigtableOptions options) {
       checkArgument(options != null, "options can not be null");
       return withBigtableOptions(options.toBuilder());
@@ -320,17 +315,29 @@ public Read withBigtableOptions(BigtableOptions options) {
      * will have no effect on the returned {@link BigtableIO.Read}.
      *
      * <p>Does not modify this object.
+     *
+     * @deprecated will be replaced by bigtable options configurator.
      */
+    @Deprecated
     public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
       checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
       // TODO: is there a better way to clone a Builder? Want it to be immune 
from user changes.
-      BigtableOptions options = optionsBuilder.build();
-
-      BigtableOptions.Builder clonedBuilder = options.toBuilder()
-          .setUseCachedDataPool(true);
-      BigtableOptions clonedOptions = clonedBuilder.build();
+      return 
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+    }
 
-      return toBuilder().setBigtableOptions(clonedOptions).build();
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud 
Bigtable instance
+     * with customized options provided by given configurator.
+     *
+     * <p>WARNING: instanceId and projectId should not be provided here and 
should be provided over
+     * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withBigtableOptionsConfigurator(
+      SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
+      checkArgument(configurator != null, "configurator can not be null");
+      return toBuilder().setBigtableOptionsConfigurator(configurator).build();
     }
 
     /**
@@ -427,17 +434,25 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
         builder.add(DisplayData.item("rowFilter", getRowFilter().toString())
           .withLabel("Table Row Filter"));
       }
+
+      builder.add(DisplayData.item("effectiveBigtableOptions",
+        effectiveUserProvidedBigtableOptions().build().toString())
+        .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Read.class)
           .add("options", getBigtableOptions())
+          .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
           .add("projectId", getProjectId())
           .add("instanceId", getInstanceId())
           .add("tableId", getTableId())
           .add("keyRange", getKeyRange())
           .add("filter", getRowFilter())
+          .add("bigtableOptionsConfigurator",
+            getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
+              .getClass().getName())
           .toString();
     }
 
@@ -468,25 +483,41 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
         return getBigtableService();
       }
 
-      BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
-          ? getBigtableOptions().toBuilder()
-          : new BigtableOptions.Builder();
+      BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
 
-      clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
-      if (getInstanceId() != null) {
-        clonedOptions.setInstanceId(getInstanceId());
-      }
-      if (getProjectId() != null) {
-        clonedOptions.setProjectId(getProjectId());
-      }
+      bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
 
       if (getBigtableOptions() != null && 
getBigtableOptions().getCredentialOptions()
           .getCredentialType() == CredentialType.DefaultCredentials) {
-        clonedOptions.setCredentialOptions(
+        bigtableOptions.setCredentialOptions(
             CredentialOptions.credential(
                 pipelineOptions.as(GcpOptions.class).getGcpCredential()));
       }
-      return new BigtableServiceImpl(clonedOptions.build());
+
+      // Default option that should be forced
+      bigtableOptions.setUseCachedDataPool(true);
+
+      return new BigtableServiceImpl(bigtableOptions.build());
+    }
+
+    private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
+      BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
+        ? getBigtableOptions().toBuilder()
+        : new BigtableOptions.Builder();
+
+      if (getBigtableOptionsConfigurator() != null) {
+        effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+      }
+
+      if (getInstanceId() != null) {
+        effectiveOptions.setInstanceId(getInstanceId());
+      }
+
+      if (getProjectId() != null) {
+        effectiveOptions.setProjectId(getProjectId());
+      }
+
+      return effectiveOptions;
     }
   }
 
@@ -516,10 +547,21 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
     @Nullable
     abstract BigtableService getBigtableService();
 
-    /** Returns the Google Cloud Bigtable instance being written to, and other 
parameters. */
+    /**
+     * Returns the Google Cloud Bigtable instance being written to, and other 
parameters.
+     * @deprecated will be replaced by bigtable options configurator.
+     */
+    @Deprecated
     @Nullable
     public abstract BigtableOptions getBigtableOptions();
 
+    /**
+     * Configurator of the effective Bigtable Options.
+     */
+    @Nullable
+    abstract SerializableFunction<BigtableOptions.Builder,
+      BigtableOptions.Builder> getBigtableOptionsConfigurator();
+
     abstract boolean getValidate();
 
     abstract Builder toBuilder();
@@ -533,12 +575,17 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
 
       abstract Builder setTableId(String tableId);
 
+      /** @deprecated will be replaced by bigtable options configurator. */
+      @Deprecated
       abstract Builder setBigtableOptions(BigtableOptions options);
 
       abstract Builder setBigtableService(BigtableService bigtableService);
 
       abstract Builder setValidate(boolean validate);
 
+      abstract Builder setBigtableOptionsConfigurator(
+        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
optionsConfigurator);
+
       abstract Write build();
     }
 
@@ -575,7 +622,10 @@ public Write withInstanceId(String instanceId) {
      * indicated by the given options, and using any other specified 
customizations.
      *
      * <p>Does not modify this object.
+     *
+     * @deprecated will be replaced by bigtable options configurator.
      */
+    @Deprecated
     public Write withBigtableOptions(BigtableOptions options) {
       return withBigtableOptions(options.toBuilder());
     }
@@ -592,21 +642,29 @@ public Write withBigtableOptions(BigtableOptions options) 
{
      * will have no effect on the returned {@link BigtableIO.Write}.
      *
      * <p>Does not modify this object.
+     *
+     * @deprecated will be replaced by bigtable options configurator.
      */
+    @Deprecated
     public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
       checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
       // TODO: is there a better way to clone a Builder? Want it to be immune 
from user changes.
-      BigtableOptions options = optionsBuilder.build();
+      return 
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+    }
 
-      // Set useBulkApi to true for enabling bulk writes
-      BigtableOptions.Builder clonedBuilder = options.toBuilder()
-          .setBulkOptions(
-              options.getBulkOptions().toBuilder()
-                  .setUseBulkApi(true)
-                  .build())
-          .setUseCachedDataPool(true);
-      BigtableOptions clonedOptions = clonedBuilder.build();
-      return toBuilder().setBigtableOptions(clonedOptions).build();
+    /**
+     * Returns a new {@link BigtableIO.Write} that will read from the Cloud 
Bigtable instance
+     * with customized options provided by given configurator.
+     *
+     * <p>WARNING: instanceId and projectId should not be provided here and 
should be provided over
+     * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBigtableOptionsConfigurator(
+      SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
+      checkArgument(configurator != null, "configurator can not be null");
+      return toBuilder().setBigtableOptionsConfigurator(configurator).build();
     }
 
     /** Disables validation that the table being written to exists. */
@@ -687,15 +745,23 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
         builder.add(DisplayData.item("instanceId", getInstanceId())
             .withLabel("Bigtable Instnace Id"));
       }
+
+      builder.add(DisplayData.item("effectiveBigtableOptions",
+        effectiveUserProvidedBigtableOptions().build().toString())
+        .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Write.class)
           .add("options", getBigtableOptions())
+          .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
           .add("tableId", getTableId())
           .add("projectId", getProjectId())
           .add("instanceId", getInstanceId())
+          .add("bigtableOptionsConfigurator",
+          getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
+            .getClass().getName())
           .toString();
     }
 
@@ -713,25 +779,45 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
         return getBigtableService();
       }
 
-      BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
-          ? getBigtableOptions().toBuilder()
-          : new BigtableOptions.Builder();
+      BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+
+      bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+
+      if (getBigtableOptions() != null && 
getBigtableOptions().getCredentialOptions()
+        .getCredentialType() == CredentialType.DefaultCredentials) {
+        bigtableOptions.setCredentialOptions(
+          CredentialOptions.credential(
+            pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+      }
+
+      // Set useBulkApi to true for enabling bulk writes
+      bigtableOptions
+        .setUseCachedDataPool(true)
+        .setBulkOptions(
+          
effectiveUserProvidedBigtableOptions().build().getBulkOptions().toBuilder()
+            .setUseBulkApi(true)
+            .build());
+
+      return new BigtableServiceImpl(bigtableOptions.build());
+    }
+
+    private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
+      BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
+        ? getBigtableOptions().toBuilder()
+        : new BigtableOptions.Builder();
+
+      if (getBigtableOptionsConfigurator() != null) {
+        effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+      }
 
-      clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
       if (getInstanceId() != null) {
-        clonedOptions.setInstanceId(getInstanceId());
+        effectiveOptions.setInstanceId(getInstanceId());
       }
       if (getProjectId() != null) {
-        clonedOptions.setProjectId(getProjectId());
+        effectiveOptions.setProjectId(getProjectId());
       }
 
-      if (getBigtableOptions() != null && 
getBigtableOptions().getCredentialOptions()
-          .getCredentialType() == CredentialType.DefaultCredentials) {
-        clonedOptions.setCredentialOptions(
-            CredentialOptions.credential(
-                pipelineOptions.as(GcpOptions.class).getGcpCredential()));
-      }
-      return new BigtableServiceImpl(clonedOptions.build());
+      return effectiveOptions;
     }
 
     private class BigtableWriterFn extends DoFn<KV<ByteString, 
Iterable<Mutation>>, Void> {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index a976e4ad351..418db92c4bf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -140,6 +140,15 @@ public BigtableService apply(PipelineOptions input) {
   private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> 
BIGTABLE_WRITE_TYPE =
       new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
 
+  private static final SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>
+    PORT_CONFIGURATOR =
+    new SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>() {
+      @Override
+      public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+        return input.setPort(1234);
+      }
+    };
+
   @Before
   public void setup() throws Exception {
     service = new FakeBigtableService();
@@ -158,12 +167,14 @@ public void testReadBuildsCorrectly() {
         BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS)
             .withTableId("table")
             .withInstanceId("instance")
-            .withProjectId("project");
+            .withProjectId("project")
+            .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
     assertEquals("options_project", read.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
read.getBigtableOptions().getInstanceId());
     assertEquals("instance", read.getInstanceId());
     assertEquals("project", read.getProjectId());
     assertEquals("table", read.getTableId());
+    assertEquals(PORT_CONFIGURATOR, read.getBigtableOptionsConfigurator());
   }
 
   @Test
@@ -214,12 +225,14 @@ public void testWriteBuildsCorrectly() {
         BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS)
             .withTableId("table")
             .withInstanceId("instance")
-            .withProjectId("project");
+            .withProjectId("project")
+            .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
     assertEquals("table", write.getTableId());
     assertEquals("options_project", write.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
write.getBigtableOptions().getInstanceId());
     assertEquals("instance", write.getInstanceId());
     assertEquals("project", write.getProjectId());
+    assertEquals(PORT_CONFIGURATOR, write.getBigtableOptionsConfigurator());
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BigtableIO should use ValueProviders 
> -------------------------------------
>
>                 Key: BEAM-3008
>                 URL: https://issues.apache.org/jira/browse/BEAM-3008
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-gcp
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>
> [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO 
> templatization.  This Issue is a request to get a fully featured template for 
> BigtableIO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to