[
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270091#comment-16270091
]
ASF GitHub Bot commented on BEAM-3008:
--------------------------------------
chamikaramj closed pull request #4171: [BEAM-3008] Extends API for BigtableIO
Read and Write by adding withInstanceId and withProjectId
URL: https://github.com/apache/beam/pull/4171
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 29dc269f7fd..8f04c9dfdfd 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -78,38 +78,37 @@
* <p>The Bigtable source returns a set of rows from a single table, returning
a
* {@code PCollection<Row>}.
*
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a
{@link BigtableOptions}
- * or builder configured with the project and other information necessary to
identify the
- * Bigtable instance. By default, {@link BigtableIO.Read} will read all rows
in the table. The row
- * range to be read can optionally be restricted using {@link
BigtableIO.Read#withKeyRange}, and
- * a {@link RowFilter} can be specified using {@link
BigtableIO.Read#withRowFilter}. For example:
+ * <p>To configure a Cloud Bigtable source, you must supply a table id, a
project id, an instance
+ * id and optionally a {@link BigtableOptions} to provide more specific
connection configuration.
+ * By default, {@link BigtableIO.Read} will read all rows in the table. The
row range to be read
+ * can optionally be restricted using {@link BigtableIO.Read#withKeyRange},
and a {@link RowFilter}
+ * can be specified using {@link BigtableIO.Read#withRowFilter}. For example:
*
* <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance");
*
* Pipeline p = ...;
*
* // Scan the entire table.
* p.apply("read",
* BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
* .withTableId("table"));
*
* // Scan a prefix of the table.
* ByteKeyRange keyRange = ...;
* p.apply("read",
* BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
* .withTableId("table")
* .withKeyRange(keyRange));
*
* // Scan a subset of rows that match the specified row filter.
* p.apply("filtered read",
* BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
* .withTableId("table")
* .withRowFilter(filter));
* }</pre>
@@ -121,21 +120,17 @@
* {@link ByteString} is the key of the row being mutated, and each {@link
Mutation} represents an
* idempotent transformation to that row.
*
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a
{@link BigtableOptions}
- * or builder configured with the project and other information necessary to
identify the
- * Bigtable instance, for example:
+ * <p>To configure a Cloud Bigtable sink, you must supply a table id, a
project id, an instance id
+ * and optionally and optionally a {@link BigtableOptions} to provide more
specific connection
+ * configuration, for example:
*
* <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance");
- *
* PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
*
* data.apply("write",
* BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
+ * .setProjectId("project")
+ * .setInstanceId("instance")
* .withTableId("table"));
* }</pre>
*
@@ -146,8 +141,6 @@
* <pre>{@code
* BigtableOptions.Builder optionsBuilder =
* new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance")
* .setUsePlaintextNegotiation(true)
* .setCredentialOptions(CredentialOptions.nullCredential())
* .setDataHost("127.0.0.1") // network interface where Bigtable
emulator is bound
@@ -160,6 +153,8 @@
* data.apply("write",
* BigtableIO.write()
* .withBigtableOptions(optionsBuilder)
+ * .setProjectId("project")
+ * .setInstanceId("instance")
* .withTableId("table");
* }</pre>
*
@@ -221,6 +216,14 @@ public static Write write() {
@AutoValue
public abstract static class Read extends PTransform<PBegin,
PCollection<Row>> {
+ /** Returns the project id being written to. */
+ @Nullable
+ abstract String getProjectId();
+
+ /** Returns the instance id being written to. */
+ @Nullable
+ abstract String getInstanceId();
+
@Nullable
abstract RowFilter getRowFilter();
@@ -246,6 +249,10 @@ public static Write write() {
@AutoValue.Builder
abstract static class Builder {
+ abstract Builder setProjectId(String projectId);
+
+ abstract Builder setInstanceId(String instanceId);
+
abstract Builder setRowFilter(RowFilter filter);
abstract Builder setKeyRange(ByteKeyRange keyRange);
@@ -261,9 +268,37 @@ public static Write write() {
abstract Read build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable project
+ * indicated by given parameter, requires {@link #withInstanceId(String)}
to be called to
+ * determine the instance.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withProjectId(String projectId) {
+ checkNotNull(projectId, "Project Id of BigTable can not be null");
+ return toBuilder().setProjectId(projectId).build();
+ }
+
/**
* Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
- * indicated by the given options, and using any other specified
customizations.
+ * indicated by given parameter, requires {@link #withProjectId(String)}
to be called to
+ * determine the project.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withInstanceId(String instanceId) {
+ checkNotNull(instanceId, "Instance Id of BigTable can not be null");
+ return toBuilder().setInstanceId(instanceId).build();
+ }
+
+ /**
+ * WARNING: Should be used only to specify additional parameters for
connection
+ * to the Cloud Bigtable, instanceId and projectId should be provided over
+ * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ *
+ * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
+ * indicated by {@link #withProjectId(String)}, and using any other
specified customizations.
*
* <p>Does not modify this object.
*/
@@ -273,7 +308,11 @@ public Read withBigtableOptions(BigtableOptions options) {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
+ * WARNING: Should be used only to specify additional parameters for
connection to
+ * the Cloud Bigtable, instanceId and projectId should be provided over
+ * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ *
+ * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
*
* <p>Clones the given {@link BigtableOptions} builder so that any further
changes
@@ -331,8 +370,9 @@ public Read withoutValidation() {
@Override
public PCollection<Row> expand(PBegin input) {
- checkArgument(getBigtableOptions() != null, "withBigtableOptions() is
required");
+ validateBigtableConfig(getBigtableOptions(), getProjectId(),
getInstanceId());
checkArgument(getTableId() != null && !getTableId().isEmpty(),
"withTableId() is required");
+
BigtableSource source =
new BigtableSource(new SerializableFunction<PipelineOptions,
BigtableService>() {
@Override
@@ -369,6 +409,16 @@ public void populateDisplayData(DisplayData.Builder
builder) {
.withLabel("Bigtable Options"));
}
+ if (getProjectId() != null) {
+ builder.add(DisplayData.item("projectId", getProjectId())
+ .withLabel("Bigtable Project Id"));
+ }
+
+ if (getInstanceId() != null) {
+ builder.add(DisplayData.item("instanceId", getInstanceId())
+ .withLabel("Bigtable Instnace Id"));
+ }
+
builder.addIfNotDefault(
DisplayData.item("keyRange", getKeyRange().toString()),
ByteKeyRange.ALL_KEYS.toString());
@@ -382,6 +432,8 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public String toString() {
return MoreObjects.toStringHelper(Read.class)
.add("options", getBigtableOptions())
+ .add("projectId", getProjectId())
+ .add("instanceId", getInstanceId())
.add("tableId", getTableId())
.add("keyRange", getKeyRange())
.add("filter", getRowFilter())
@@ -414,9 +466,20 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
if (getBigtableService() != null) {
return getBigtableService();
}
- BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder();
+
+ BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
+ ? getBigtableOptions().toBuilder()
+ : new BigtableOptions.Builder();
+
clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
- if (getBigtableOptions().getCredentialOptions()
+ if (getInstanceId() != null) {
+ clonedOptions.setInstanceId(getInstanceId());
+ }
+ if (getProjectId() != null) {
+ clonedOptions.setProjectId(getProjectId());
+ }
+
+ if (getBigtableOptions() != null &&
getBigtableOptions().getCredentialOptions()
.getCredentialType() == CredentialType.DefaultCredentials) {
clonedOptions.setCredentialOptions(
CredentialOptions.credential(
@@ -437,6 +500,14 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
public abstract static class Write
extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>,
PDone> {
+ /** Returns the project id being written to. */
+ @Nullable
+ abstract String getProjectId();
+
+ /** Returns the instance id being written to. */
+ @Nullable
+ abstract String getInstanceId();
+
/** Returns the table being written to. */
@Nullable
abstract String getTableId();
@@ -455,6 +526,10 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
@AutoValue.Builder
abstract static class Builder {
+ abstract Builder setProjectId(String projectId);
+
+ abstract Builder setInstanceId(String instanceId);
+
abstract Builder setTableId(String tableId);
abstract Builder setBigtableOptions(BigtableOptions options);
@@ -467,7 +542,35 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
+ * Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable project
+ * indicated by given parameter, requires {@link #withInstanceId(String)}
+ * to be called to determine the instance.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withProjectId(String projectId) {
+ checkNotNull(projectId, "Project Id of BigTable can not be null");
+ return toBuilder().setProjectId(projectId).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable instance
+ * indicated by given parameter, requires {@link #withProjectId(String)}
to be called to
+ * determine the project.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withInstanceId(String instanceId) {
+ checkNotNull(instanceId, "Instance Id of BigTable can not be null");
+ return toBuilder().setInstanceId(instanceId).build();
+ }
+
+ /**
+ * WARNING: Should be used only to specify additional parameters for
connection to
+ * the Cloud Bigtable, instanceId and projectId should be provided over
+ * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ *
+ * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
*
* <p>Does not modify this object.
@@ -477,7 +580,11 @@ public Write withBigtableOptions(BigtableOptions options) {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
+ * WARNING: Should be used only to specify additional parameters for
connection
+ * to the Cloud Bigtable, instanceId and projectId should be provided over
+ * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ *
+ * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
*
* <p>Clones the given {@link BigtableOptions} builder so that any further
changes
@@ -518,16 +625,16 @@ public Write withTableId(String tableId) {
@Override
public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input)
{
- checkArgument(getBigtableOptions() != null, "withBigtableOptions() is
required");
+ validateBigtableConfig(getBigtableOptions(), getProjectId(),
getInstanceId());
checkArgument(getTableId() != null && !getTableId().isEmpty(),
"withTableId() is required");
input.apply(ParDo.of(new BigtableWriterFn(getTableId(),
new SerializableFunction<PipelineOptions, BigtableService>() {
- @Override
- public BigtableService apply(PipelineOptions options) {
- return getBigtableService(options);
- }
- })));
+ @Override
+ public BigtableService apply(PipelineOptions options) {
+ return getBigtableService(options);
+ }
+ })));
return PDone.in(input.getPipeline());
}
@@ -569,6 +676,16 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.add(DisplayData.item("bigtableOptions",
getBigtableOptions().toString())
.withLabel("Bigtable Options"));
}
+
+ if (getProjectId() != null) {
+ builder.add(DisplayData.item("projectId", getProjectId())
+ .withLabel("Bigtable Project Id"));
+ }
+
+ if (getInstanceId() != null) {
+ builder.add(DisplayData.item("instanceId", getInstanceId())
+ .withLabel("Bigtable Instnace Id"));
+ }
}
@Override
@@ -576,6 +693,8 @@ public String toString() {
return MoreObjects.toStringHelper(Write.class)
.add("options", getBigtableOptions())
.add("tableId", getTableId())
+ .add("projectId", getProjectId())
+ .add("instanceId", getInstanceId())
.toString();
}
@@ -592,9 +711,20 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
if (getBigtableService() != null) {
return getBigtableService();
}
- BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder();
+
+ BigtableOptions.Builder clonedOptions = getBigtableOptions() != null
+ ? getBigtableOptions().toBuilder()
+ : new BigtableOptions.Builder();
+
clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
- if (getBigtableOptions().getCredentialOptions()
+ if (getInstanceId() != null) {
+ clonedOptions.setInstanceId(getInstanceId());
+ }
+ if (getProjectId() != null) {
+ clonedOptions.setProjectId(getProjectId());
+ }
+
+ if (getBigtableOptions() != null &&
getBigtableOptions().getCredentialOptions()
.getCredentialType() == CredentialType.DefaultCredentials) {
clonedOptions.setCredentialOptions(
CredentialOptions.credential(
@@ -1101,4 +1231,16 @@ public BigtableWriteException(KV<ByteString,
Iterable<Mutation>> record, Throwab
cause);
}
}
+
+ static void validateBigtableConfig(BigtableOptions options, String
projectId, String instanceId) {
+ checkArgument(projectId != null && !projectId.isEmpty()
+ || options != null && options.getProjectId() != null
+ && !options.getProjectId().isEmpty(),
+ "Could not obtain Bigtable project id");
+
+ checkArgument(instanceId != null && !instanceId.isEmpty()
+ || options != null && options.getInstanceId() != null
+ && !options.getInstanceId().isEmpty(),
+ "Could not obtain Bigtable instance id");
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index af3354bd4ad..a976e4ad351 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -129,13 +129,13 @@ public BigtableService apply(PipelineOptions input) {
};
private static final BigtableOptions BIGTABLE_OPTIONS =
new BigtableOptions.Builder()
- .setProjectId("project")
- .setInstanceId("instance")
+ .setProjectId("options_project")
+ .setInstanceId("options_instance")
.build();
private static BigtableIO.Read defaultRead =
- BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
+ BigtableIO.read().withInstanceId("instance").withProjectId("project");
private static BigtableIO.Write defaultWrite =
- BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+ BigtableIO.write().withInstanceId("instance").withProjectId("project");
private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>>
BIGTABLE_WRITE_TYPE =
new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
@@ -155,42 +155,100 @@ private static ByteKey makeByteKey(ByteString key) {
@Test
public void testReadBuildsCorrectly() {
BigtableIO.Read read =
-
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
- assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("instance", read.getBigtableOptions().getInstanceId());
+ BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId("table")
+ .withInstanceId("instance")
+ .withProjectId("project");
+ assertEquals("options_project", read.getBigtableOptions().getProjectId());
+ assertEquals("options_instance",
read.getBigtableOptions().getInstanceId());
+ assertEquals("instance", read.getInstanceId());
+ assertEquals("project", read.getProjectId());
assertEquals("table", read.getTableId());
}
@Test
- public void testReadBuildsCorrectlyInDifferentOrder() {
- BigtableIO.Read read =
-
BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
- assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("instance", read.getBigtableOptions().getInstanceId());
- assertEquals("table", read.getTableId());
+ public void testReadValidationFailsMissingTable() {
+ BigtableIO.Read read =
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
+
+ thrown.expect(IllegalArgumentException.class);
+
+ read.expand(null);
+ }
+
+ @Test
+ public void testReadValidationFailsMissingInstanceId() {
+ BigtableIO.Read read = BigtableIO.read().withTableId("table")
+ .withProjectId("project")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
+
+ thrown.expect(IllegalArgumentException.class);
+
+ read.expand(null);
+ }
+
+ @Test
+ public void testReadValidationFailsMissingProjectId() {
+ BigtableIO.Read read = BigtableIO.read().withTableId("table")
+ .withInstanceId("instance")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
+
+ thrown.expect(IllegalArgumentException.class);
+
+ read.expand(null);
+ }
+
+ @Test
+ public void testReadValidationFailsMissingInstanceIdAndProjectId() {
+ BigtableIO.Read read = BigtableIO.read()
+ .withTableId("table")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
+
+ thrown.expect(IllegalArgumentException.class);
+
+ read.expand(null);
}
@Test
public void testWriteBuildsCorrectly() {
BigtableIO.Write write =
-
BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+ BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId("table")
+ .withInstanceId("instance")
+ .withProjectId("project");
assertEquals("table", write.getTableId());
- assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("instance", write.getBigtableOptions().getInstanceId());
+ assertEquals("options_project", write.getBigtableOptions().getProjectId());
+ assertEquals("options_instance",
write.getBigtableOptions().getInstanceId());
+ assertEquals("instance", write.getInstanceId());
+ assertEquals("project", write.getProjectId());
}
@Test
- public void testWriteBuildsCorrectlyInDifferentOrder() {
- BigtableIO.Write write =
-
BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
- assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("instance", write.getBigtableOptions().getInstanceId());
- assertEquals("table", write.getTableId());
+ public void testWriteValidationFailsMissingInstanceId() {
+ BigtableIO.Write write = BigtableIO.write().withTableId("table")
+ .withProjectId("project")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
+
+ thrown.expect(IllegalArgumentException.class);
+
+ write.expand(null);
}
@Test
- public void testWriteValidationFailsMissingTable() {
- BigtableIO.Write write =
BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+ public void testWriteValidationFailsMissingProjectId() {
+ BigtableIO.Write write = BigtableIO.write().withTableId("table")
+ .withInstanceId("instance")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
+
+ thrown.expect(IllegalArgumentException.class);
+
+ write.expand(null);
+ }
+
+ @Test
+ public void testWriteValidationFailsMissingInstanceIdAndProjectId() {
+ BigtableIO.Write write = BigtableIO.write()
+ .withTableId("table")
+ .withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
@@ -198,7 +256,7 @@ public void testWriteValidationFailsMissingTable() {
}
@Test
- public void testWriteValidationFailsMissingOptions() {
+ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() {
BigtableIO.Write write = BigtableIO.write().withTableId("table");
thrown.expect(IllegalArgumentException.class);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> BigtableIO should use ValueProviders
> -------------------------------------
>
> Key: BEAM-3008
> URL: https://issues.apache.org/jira/browse/BEAM-3008
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-gcp
> Reporter: Solomon Duskis
> Assignee: Solomon Duskis
>
> [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO
> templatization. This Issue is a request to get a fully featured template for
> BigtableIO.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)