[
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286771#comment-16286771
]
ASF GitHub Bot commented on BEAM-3008:
--------------------------------------
chamikaramj closed pull request #4205: [BEAM-3008] Adds BigtableOptions
configurator to the BigtableIO
URL: https://github.com/apache/beam/pull/4205
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 8b4609da224..febdc1f53b0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -122,43 +122,19 @@
* idempotent transformation to that row.
*
* <p>To configure a Cloud Bigtable sink, you must supply a table id, a
project id, an instance id
- * and optionally and optionally a {@link BigtableOptions} to provide more
specific connection
- * configuration, for example:
+ * and optionally a configuration function for {@link BigtableOptions} to
provide more specific
+ * connection configuration, for example:
*
* <pre>{@code
* PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
*
* data.apply("write",
* BigtableIO.write()
- * .setProjectId("project")
- * .setInstanceId("instance")
+ * .withProjectId("project")
+ * .withInstanceId("instance")
* .withTableId("table"));
* }</pre>
*
- * <h3>Using local emulator</h3>
- *
- * <p>In order to use local emulator for Bigtable you should use:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setUsePlaintextNegotiation(true)
- * .setCredentialOptions(CredentialOptions.nullCredential())
- * .setDataHost("127.0.0.1") // network interface where Bigtable
emulator is bound
- * .setInstanceAdminHost("127.0.0.1")
- * .setTableAdminHost("127.0.0.1")
- * .setPort(LOCAL_EMULATOR_PORT))
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- * BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
- * .setProjectId("project")
- * .setInstanceId("instance")
- * .withTableId("table");
- * }</pre>
- *
* <h3>Experimental</h3>
*
* <p>This connector for Cloud Bigtable is considered experimental and may
break or receive
@@ -239,12 +215,23 @@ public static Write write() {
@Nullable
abstract BigtableService getBigtableService();
- /** Returns the Google Cloud Bigtable instance being read from, and other
parameters. */
+ /**
+ * Returns the Google Cloud Bigtable instance being read from, and other
parameters.
+ * @deprecated will be replaced by bigtable options configurator.
+ */
+ @Deprecated
@Nullable
public abstract BigtableOptions getBigtableOptions();
public abstract boolean getValidate();
+ /**
+ * Configurator of the effective Bigtable Options.
+ */
+ @Nullable
+ abstract SerializableFunction<BigtableOptions.Builder,
+ BigtableOptions.Builder> getBigtableOptionsConfigurator();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -260,12 +247,17 @@ public static Write write() {
abstract Builder setTableId(String tableId);
+ /** @deprecated will be replaced by bigtable options configurator. */
+ @Deprecated
abstract Builder setBigtableOptions(BigtableOptions options);
abstract Builder setBigtableService(BigtableService bigtableService);
abstract Builder setValidate(boolean validate);
+ abstract Builder setBigtableOptionsConfigurator(
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
optionsConfigurator);
+
abstract Read build();
}
@@ -302,7 +294,10 @@ public Read withInstanceId(String instanceId) {
* indicated by {@link #withProjectId(String)}, and using any other
specified customizations.
*
* <p>Does not modify this object.
+ *
+ * @deprecated will be replaced by bigtable options configurator.
*/
+ @Deprecated
public Read withBigtableOptions(BigtableOptions options) {
checkArgument(options != null, "options can not be null");
return withBigtableOptions(options.toBuilder());
@@ -320,17 +315,29 @@ public Read withBigtableOptions(BigtableOptions options) {
* will have no effect on the returned {@link BigtableIO.Read}.
*
* <p>Does not modify this object.
+ *
+ * @deprecated will be replaced by bigtable options configurator.
*/
+ @Deprecated
public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
// TODO: is there a better way to clone a Builder? Want it to be immune
from user changes.
- BigtableOptions options = optionsBuilder.build();
-
- BigtableOptions.Builder clonedBuilder = options.toBuilder()
- .setUseCachedDataPool(true);
- BigtableOptions clonedOptions = clonedBuilder.build();
+ return
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+ }
- return toBuilder().setBigtableOptions(clonedOptions).build();
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
+ * with customized options provided by given configurator.
+ *
+ * <p>WARNING: instanceId and projectId should not be provided here and
should be provided over
+ * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withBigtableOptionsConfigurator(
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
configurator) {
+ checkArgument(configurator != null, "configurator can not be null");
+ return toBuilder().setBigtableOptionsConfigurator(configurator).build();
}
/**
@@ -427,17 +434,25 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.add(DisplayData.item("rowFilter", getRowFilter().toString())
.withLabel("Table Row Filter"));
}
+
+ builder.add(DisplayData.item("effectiveBigtableOptions",
+ effectiveUserProvidedBigtableOptions().build().toString())
+ .withLabel("Effective BigtableOptions resulted from configuration of
given options"));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(Read.class)
.add("options", getBigtableOptions())
+ .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
.add("projectId", getProjectId())
.add("instanceId", getInstanceId())
.add("tableId", getTableId())
.add("keyRange", getKeyRange())
.add("filter", getRowFilter())
+ .add("bigtableOptionsConfigurator",
+ getBigtableOptionsConfigurator() == null ? null :
getBigtableOptionsConfigurator()
+ .getClass().getName())
.toString();
}
@@ -468,25 +483,41 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
return getBigtableService();
}
- BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
- ? getBigtableOptions().toBuilder()
- : new BigtableOptions.Builder();
+ BigtableOptions.Builder bigtableOptions =
effectiveUserProvidedBigtableOptions();
- clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
- if (getInstanceId() != null) {
- clonedOptions.setInstanceId(getInstanceId());
- }
- if (getProjectId() != null) {
- clonedOptions.setProjectId(getProjectId());
- }
+ bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
if (getBigtableOptions() != null &&
getBigtableOptions().getCredentialOptions()
.getCredentialType() == CredentialType.DefaultCredentials) {
- clonedOptions.setCredentialOptions(
+ bigtableOptions.setCredentialOptions(
CredentialOptions.credential(
pipelineOptions.as(GcpOptions.class).getGcpCredential()));
}
- return new BigtableServiceImpl(clonedOptions.build());
+
+ // Default option that should be forced
+ bigtableOptions.setUseCachedDataPool(true);
+
+ return new BigtableServiceImpl(bigtableOptions.build());
+ }
+
+ private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
+ BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
+ ? getBigtableOptions().toBuilder()
+ : new BigtableOptions.Builder();
+
+ if (getBigtableOptionsConfigurator() != null) {
+ effectiveOptions =
getBigtableOptionsConfigurator().apply(effectiveOptions);
+ }
+
+ if (getInstanceId() != null) {
+ effectiveOptions.setInstanceId(getInstanceId());
+ }
+
+ if (getProjectId() != null) {
+ effectiveOptions.setProjectId(getProjectId());
+ }
+
+ return effectiveOptions;
}
}
@@ -516,10 +547,21 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
@Nullable
abstract BigtableService getBigtableService();
- /** Returns the Google Cloud Bigtable instance being written to, and other
parameters. */
+ /**
+ * Returns the Google Cloud Bigtable instance being written to, and other
parameters.
+ * @deprecated will be replaced by bigtable options configurator.
+ */
+ @Deprecated
@Nullable
public abstract BigtableOptions getBigtableOptions();
+ /**
+ * Configurator of the effective Bigtable Options.
+ */
+ @Nullable
+ abstract SerializableFunction<BigtableOptions.Builder,
+ BigtableOptions.Builder> getBigtableOptionsConfigurator();
+
abstract boolean getValidate();
abstract Builder toBuilder();
@@ -533,12 +575,17 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
abstract Builder setTableId(String tableId);
+ /** @deprecated will be replaced by bigtable options configurator. */
+ @Deprecated
abstract Builder setBigtableOptions(BigtableOptions options);
abstract Builder setBigtableService(BigtableService bigtableService);
abstract Builder setValidate(boolean validate);
+ abstract Builder setBigtableOptionsConfigurator(
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
optionsConfigurator);
+
abstract Write build();
}
@@ -575,7 +622,10 @@ public Write withInstanceId(String instanceId) {
* indicated by the given options, and using any other specified
customizations.
*
* <p>Does not modify this object.
+ *
+ * @deprecated will be replaced by bigtable options configurator.
*/
+ @Deprecated
public Write withBigtableOptions(BigtableOptions options) {
return withBigtableOptions(options.toBuilder());
}
@@ -592,21 +642,29 @@ public Write withBigtableOptions(BigtableOptions options)
{
* will have no effect on the returned {@link BigtableIO.Write}.
*
* <p>Does not modify this object.
+ *
+ * @deprecated will be replaced by bigtable options configurator.
*/
+ @Deprecated
public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
// TODO: is there a better way to clone a Builder? Want it to be immune
from user changes.
- BigtableOptions options = optionsBuilder.build();
+ return
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+ }
- // Set useBulkApi to true for enabling bulk writes
- BigtableOptions.Builder clonedBuilder = options.toBuilder()
- .setBulkOptions(
- options.getBulkOptions().toBuilder()
- .setUseBulkApi(true)
- .build())
- .setUseCachedDataPool(true);
- BigtableOptions clonedOptions = clonedBuilder.build();
- return toBuilder().setBigtableOptions(clonedOptions).build();
+ /**
+ * Returns a new {@link BigtableIO.Write} that will read from the Cloud
Bigtable instance
+ * with customized options provided by given configurator.
+ *
+ * <p>WARNING: instanceId and projectId should not be provided here and
should be provided over
+ * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withBigtableOptionsConfigurator(
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
configurator) {
+ checkArgument(configurator != null, "configurator can not be null");
+ return toBuilder().setBigtableOptionsConfigurator(configurator).build();
}
/** Disables validation that the table being written to exists. */
@@ -687,15 +745,23 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.add(DisplayData.item("instanceId", getInstanceId())
.withLabel("Bigtable Instnace Id"));
}
+
+ builder.add(DisplayData.item("effectiveBigtableOptions",
+ effectiveUserProvidedBigtableOptions().build().toString())
+ .withLabel("Effective BigtableOptions resulted from configuration of
given options"));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(Write.class)
.add("options", getBigtableOptions())
+ .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
.add("tableId", getTableId())
.add("projectId", getProjectId())
.add("instanceId", getInstanceId())
+ .add("bigtableOptionsConfigurator",
+ getBigtableOptionsConfigurator() == null ? null :
getBigtableOptionsConfigurator()
+ .getClass().getName())
.toString();
}
@@ -713,25 +779,45 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
return getBigtableService();
}
- BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
- ? getBigtableOptions().toBuilder()
- : new BigtableOptions.Builder();
+ BigtableOptions.Builder bigtableOptions =
effectiveUserProvidedBigtableOptions();
+
+ bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+
+ if (getBigtableOptions() != null &&
getBigtableOptions().getCredentialOptions()
+ .getCredentialType() == CredentialType.DefaultCredentials) {
+ bigtableOptions.setCredentialOptions(
+ CredentialOptions.credential(
+ pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+ }
+
+ // Set useBulkApi to true for enabling bulk writes
+ bigtableOptions
+ .setUseCachedDataPool(true)
+ .setBulkOptions(
+
effectiveUserProvidedBigtableOptions().build().getBulkOptions().toBuilder()
+ .setUseBulkApi(true)
+ .build());
+
+ return new BigtableServiceImpl(bigtableOptions.build());
+ }
+
+ private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
+ BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
+ ? getBigtableOptions().toBuilder()
+ : new BigtableOptions.Builder();
+
+ if (getBigtableOptionsConfigurator() != null) {
+ effectiveOptions =
getBigtableOptionsConfigurator().apply(effectiveOptions);
+ }
- clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
if (getInstanceId() != null) {
- clonedOptions.setInstanceId(getInstanceId());
+ effectiveOptions.setInstanceId(getInstanceId());
}
if (getProjectId() != null) {
- clonedOptions.setProjectId(getProjectId());
+ effectiveOptions.setProjectId(getProjectId());
}
- if (getBigtableOptions() != null &&
getBigtableOptions().getCredentialOptions()
- .getCredentialType() == CredentialType.DefaultCredentials) {
- clonedOptions.setCredentialOptions(
- CredentialOptions.credential(
- pipelineOptions.as(GcpOptions.class).getGcpCredential()));
- }
- return new BigtableServiceImpl(clonedOptions.build());
+ return effectiveOptions;
}
private class BigtableWriterFn extends DoFn<KV<ByteString,
Iterable<Mutation>>, Void> {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index a976e4ad351..418db92c4bf 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -140,6 +140,15 @@ public BigtableService apply(PipelineOptions input) {
private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>>
BIGTABLE_WRITE_TYPE =
new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
+ private static final SerializableFunction<BigtableOptions.Builder,
BigtableOptions.Builder>
+ PORT_CONFIGURATOR =
+ new SerializableFunction<BigtableOptions.Builder,
BigtableOptions.Builder>() {
+ @Override
+ public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+ return input.setPort(1234);
+ }
+ };
+
@Before
public void setup() throws Exception {
service = new FakeBigtableService();
@@ -158,12 +167,14 @@ public void testReadBuildsCorrectly() {
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("table")
.withInstanceId("instance")
- .withProjectId("project");
+ .withProjectId("project")
+ .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("options_project", read.getBigtableOptions().getProjectId());
assertEquals("options_instance",
read.getBigtableOptions().getInstanceId());
assertEquals("instance", read.getInstanceId());
assertEquals("project", read.getProjectId());
assertEquals("table", read.getTableId());
+ assertEquals(PORT_CONFIGURATOR, read.getBigtableOptionsConfigurator());
}
@Test
@@ -214,12 +225,14 @@ public void testWriteBuildsCorrectly() {
BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("table")
.withInstanceId("instance")
- .withProjectId("project");
+ .withProjectId("project")
+ .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("table", write.getTableId());
assertEquals("options_project", write.getBigtableOptions().getProjectId());
assertEquals("options_instance",
write.getBigtableOptions().getInstanceId());
assertEquals("instance", write.getInstanceId());
assertEquals("project", write.getProjectId());
+ assertEquals(PORT_CONFIGURATOR, write.getBigtableOptionsConfigurator());
}
@Test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> BigtableIO should use ValueProviders
> -------------------------------------
>
> Key: BEAM-3008
> URL: https://issues.apache.org/jira/browse/BEAM-3008
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-gcp
> Reporter: Solomon Duskis
> Assignee: Solomon Duskis
>
> [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO
> templatization. This Issue is a request to get a fully featured template for
> BigtableIO.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)