This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f2503ba [BEAM-3008] Adds parameters templatization for Bigtable
(#4357)
f2503ba is described below
commit f2503bad7511ef5f4856fc8af9c24b01a8561b3c
Author: dmytroivanov4206 <[email protected]>
AuthorDate: Wed Jan 24 06:51:31 2018 +0100
[BEAM-3008] Adds parameters templatization for Bigtable (#4357)
---
.../beam/sdk/io/gcp/bigtable/BigtableConfig.java | 100 ++++----
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 222 +++++++++++-------
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 4 +-
.../sdk/io/gcp/bigtable/BigtableConfigTest.java | 252 +++++++++++++++++++++
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 104 +++++----
5 files changed, 494 insertions(+), 188 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
index ba633d0..4d2e4ce 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
@@ -19,7 +19,6 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.config.BigtableOptions;
@@ -30,6 +29,7 @@ import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -43,19 +43,19 @@ abstract class BigtableConfig implements Serializable {
* Returns the project id being written to.
*/
@Nullable
- abstract String getProjectId();
+ abstract ValueProvider<String> getProjectId();
/**
* Returns the instance id being written to.
*/
@Nullable
- abstract String getInstanceId();
+ abstract ValueProvider<String> getInstanceId();
/**
* Returns the table being read from.
*/
@Nullable
- abstract String getTableId();
+ abstract ValueProvider<String> getTableId();
/**
* Returns the Google Cloud Bigtable instance being written to, and other
parameters.
@@ -93,11 +93,11 @@ abstract class BigtableConfig implements Serializable {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setProjectId(String projectId);
+ abstract Builder setProjectId(ValueProvider<String> projectId);
- abstract Builder setInstanceId(String instanceId);
+ abstract Builder setInstanceId(ValueProvider<String> instanceId);
- abstract Builder setTableId(String tableId);
+ abstract Builder setTableId(ValueProvider<String> tableId);
/**
* @deprecated will be replaced by bigtable options configurator.
@@ -115,18 +115,18 @@ abstract class BigtableConfig implements Serializable {
abstract BigtableConfig build();
}
- BigtableConfig withProjectId(String projectId) {
- checkNotNull(projectId, "Project Id of BigTable can not be null");
+ BigtableConfig withProjectId(ValueProvider<String> projectId) {
+ checkArgument(projectId != null, "Project Id of BigTable can not be null");
return toBuilder().setProjectId(projectId).build();
}
- BigtableConfig withInstanceId(String instanceId) {
- checkNotNull(instanceId, "Instance Id of BigTable can not be null");
+ BigtableConfig withInstanceId(ValueProvider<String> instanceId) {
+ checkArgument(instanceId != null, "Instance Id of BigTable can not be
null");
return toBuilder().setInstanceId(instanceId).build();
}
- BigtableConfig withTableId(String tableId) {
- checkNotNull(tableId, "tableId can not be null");
+ BigtableConfig withTableId(ValueProvider<String> tableId) {
+ checkArgument(tableId != null, "tableId can not be null");
return toBuilder().setTableId(tableId).build();
}
@@ -135,13 +135,13 @@ abstract class BigtableConfig implements Serializable {
*/
@Deprecated
BigtableConfig withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "Bigtable options can not be null");
+ checkArgument(options != null, "Bigtable options can not be null");
return toBuilder().setBigtableOptions(options).build();
}
BigtableConfig withBigtableOptionsConfigurator(
SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
configurator) {
- checkNotNull(configurator, "configurator can not be null");
+ checkArgument(configurator != null, "configurator can not be null");
return toBuilder().setBigtableOptionsConfigurator(configurator).build();
}
@@ -151,47 +151,40 @@ abstract class BigtableConfig implements Serializable {
@VisibleForTesting
BigtableConfig withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService can not be null");
+ checkArgument(bigtableService != null, "bigtableService can not be null");
return toBuilder().setBigtableService(bigtableService).build();
}
void validate() {
- checkArgument(getProjectId() != null && !getProjectId().isEmpty()
+ checkArgument(getTableId() != null
+ && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
+ "Could not obtain Bigtable table id");
+
+ checkArgument(getProjectId() != null
+ && (!getProjectId().isAccessible() || !getProjectId().get().isEmpty())
|| getBigtableOptions() != null && getBigtableOptions().getProjectId()
!= null
&& !getBigtableOptions().getProjectId().isEmpty(),
"Could not obtain Bigtable project id");
- checkArgument(getInstanceId() != null && !getInstanceId().isEmpty()
+ checkArgument(getInstanceId() != null
+ && (!getInstanceId().isAccessible() ||
!getInstanceId().get().isEmpty())
|| getBigtableOptions() != null &&
getBigtableOptions().getInstanceId() != null
&& !getBigtableOptions().getInstanceId().isEmpty(),
"Could not obtain Bigtable instance id");
-
- checkArgument(getTableId() != null && !getTableId().isEmpty(),
- "Could not obtain Bigtable table id");
}
void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("tableId", getTableId())
- .withLabel("Table ID"));
+ builder
+ .addIfNotNull(DisplayData.item("projectId",
getProjectId()).withLabel("Bigtable Project Id"))
+ .addIfNotNull(DisplayData.item("instanceId", getInstanceId())
+ .withLabel("Bigtable Instance Id"))
+ .addIfNotNull(DisplayData.item("tableId",
getTableId()).withLabel("Bigtable Table Id"))
+ .add(DisplayData.item("withValidation", getValidate()).withLabel("Check
is table exists"));
if (getBigtableOptions() != null) {
builder.add(DisplayData.item("bigtableOptions",
getBigtableOptions().toString())
.withLabel("Bigtable Options"));
}
-
- if (getProjectId() != null) {
- builder.add(DisplayData.item("projectId", getProjectId())
- .withLabel("Bigtable Project Id"));
- }
-
- if (getInstanceId() != null) {
- builder.add(DisplayData.item("instanceId", getInstanceId())
- .withLabel("Bigtable Instnace Id"));
- }
-
- builder.add(DisplayData.item("effectiveBigtableOptions",
- effectiveUserProvidedBigtableOptions().build().toString())
- .withLabel("Effective BigtableOptions resulted from configuration of
given options"));
}
/**
@@ -225,18 +218,10 @@ abstract class BigtableConfig implements Serializable {
return new BigtableServiceImpl(bigtableOptions.build());
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(BigtableConfig.class)
- .add("projectId", getProjectId())
- .add("instanceId", getInstanceId())
- .add("tableId", getTableId())
- .add("bigtableOptionsConfigurator",
- getBigtableOptionsConfigurator() == null ? null :
getBigtableOptionsConfigurator()
- .getClass().getName())
- .add("options", getBigtableOptions())
- .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
- .toString();
+ boolean isDataAccessible() {
+ return getTableId().isAccessible()
+ && (getProjectId() == null || getProjectId().isAccessible())
+ && (getInstanceId() == null || getInstanceId().isAccessible());
}
private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
@@ -249,13 +234,26 @@ abstract class BigtableConfig implements Serializable {
}
if (getInstanceId() != null) {
- effectiveOptions.setInstanceId(getInstanceId());
+ effectiveOptions.setInstanceId(getInstanceId().get());
}
if (getProjectId() != null) {
- effectiveOptions.setProjectId(getProjectId());
+ effectiveOptions.setProjectId(getProjectId().get());
}
return effectiveOptions;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(BigtableConfig.class)
+ .add("projectId", getProjectId())
+ .add("instanceId", getInstanceId())
+ .add("tableId", getTableId())
+ .add("bigtableOptionsConfigurator",
+ getBigtableOptionsConfigurator() == null ? null :
getBigtableOptionsConfigurator()
+ .getClass().getName())
+ .add("options", getBigtableOptions())
+ .toString();
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 71c0415..4278f53 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -28,6 +28,7 @@ import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableList;
@@ -54,6 +55,7 @@ import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -151,11 +153,11 @@ public class BigtableIO {
/**
* Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code
Read} must be
- * initialized with a
- * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions)
BigtableOptions} that specifies
- * the source Cloud Bigtable instance, and a {@link
BigtableIO.Read#withTableId tableId} that
- * specifies which table to read. A {@link RowFilter} may also optionally be
specified using
- * {@link BigtableIO.Read#withRowFilter}.
+ * initialized with a {@link BigtableIO.Read#withInstanceId} and
+ * {@link BigtableIO.Read#withProjectId} that specifies the source Cloud
Bigtable
+ * instance, and a {@link BigtableIO.Read#withTableId} that specifies which
table to
+ * read. A {@link RowFilter} may also optionally be specified using
+ * {@link BigtableIO.Read#withRowFilter(RowFilter)}.
*/
@Experimental
public static Read read() {
@@ -164,10 +166,10 @@ public class BigtableIO {
/**
* Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code
Write} must be
- * initialized with a
- * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions)
BigtableOptions} that specifies
- * the destination Cloud Bigtable instance, and a {@link
BigtableIO.Write#withTableId tableId}
- * that specifies which table to write.
+ * initialized with a {@link BigtableIO.Write#withProjectId} and
+ * {@link BigtableIO.Write#withInstanceId} that specifies the destination
Cloud
+ * Bigtable instance, and a {@link BigtableIO.Write#withTableId} that
specifies
+ * which table to write.
*/
@Experimental
public static Write write() {
@@ -196,7 +198,8 @@ public class BigtableIO {
/** Returns the table being read from. */
@Nullable
public String getTableId() {
- return getBigtableConfig().getTableId();
+ ValueProvider<String> tableId = getBigtableConfig().getTableId();
+ return tableId != null && tableId.isAccessible() ? tableId.get() : null;
}
/**
@@ -213,7 +216,7 @@ public class BigtableIO {
static Read create() {
BigtableConfig config = BigtableConfig.builder()
- .setTableId("")
+ .setTableId(ValueProvider.StaticValueProvider.of(""))
.setValidate(true)
.build();
@@ -237,35 +240,76 @@ public class BigtableIO {
/**
* Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable project
- * indicated by given parameter, requires {@link #withInstanceId(String)}
to be called to
+ * indicated by given parameter, requires {@link #withInstanceId} to be
called to
* determine the instance.
*
* <p>Does not modify this object.
*/
- public Read withProjectId(String projectId) {
+ public Read withProjectId(ValueProvider<String> projectId) {
BigtableConfig config = getBigtableConfig();
return
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
}
/**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable project
+ * indicated by given parameter, requires {@link #withInstanceId} to be
called to
+ * determine the instance.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withProjectId(String projectId) {
+ return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+ }
+
+ /**
* Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
- * indicated by given parameter, requires {@link #withProjectId(String)}
to be called to
+ * indicated by given parameter, requires {@link #withProjectId} to be
called to
* determine the project.
*
* <p>Does not modify this object.
*/
- public Read withInstanceId(String instanceId) {
+ public Read withInstanceId(ValueProvider<String> instanceId) {
BigtableConfig config = getBigtableConfig();
return
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
}
/**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
+ * indicated by given parameter, requires {@link #withProjectId} to be
called to
+ * determine the project.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withInstanceId(String instanceId) {
+ return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the specified
table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withTableId(ValueProvider<String> tableId) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the specified
table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withTableId(String tableId) {
+ return withTableId(ValueProvider.StaticValueProvider.of(tableId));
+ }
+
+ /**
* WARNING: Should be used only to specify additional parameters for
connection
* to the Cloud Bigtable, instanceId and projectId should be provided over
- * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ * {@link #withInstanceId} and {@link #withProjectId} respectively.
*
* <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
- * indicated by {@link #withProjectId(String)}, and using any other
specified customizations.
+ * indicated by {@link #withProjectId}, and using any other specified
customizations.
*
* <p>Does not modify this object.
*
@@ -280,7 +324,7 @@ public class BigtableIO {
/**
* WARNING: Should be used only to specify additional parameters for
connection to
* the Cloud Bigtable, instanceId and projectId should be provided over
- * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ * {@link #withInstanceId} and {@link #withProjectId} respectively.
*
* <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
@@ -306,7 +350,7 @@ public class BigtableIO {
* with customized options provided by given configurator.
*
* <p>WARNING: instanceId and projectId should not be provided here and
should be provided over
- * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+ * {@link #withProjectId} and {@link #withInstanceId}.
*
* <p>Does not modify this object.
*/
@@ -354,16 +398,6 @@ public class BigtableIO {
return toBuilder().setKeyRanges(keyRanges).build();
}
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the specified
table.
- *
- * <p>Does not modify this object.
- */
- public Read withTableId(String tableId) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
- }
-
/** Disables validation that the table being read from exists. */
public Read withoutValidation() {
BigtableConfig config = getBigtableConfig();
@@ -378,6 +412,7 @@ public class BigtableIO {
*
* <p>Does not modify this object.
*/
+ @VisibleForTesting
Read withBigtableService(BigtableService bigtableService) {
BigtableConfig config = getBigtableConfig();
return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
@@ -388,12 +423,7 @@ public class BigtableIO {
getBigtableConfig().validate();
BigtableSource source =
- new BigtableSource(new SerializableFunction<PipelineOptions,
BigtableService>() {
- @Override
- public BigtableService apply(PipelineOptions options) {
- return getBigtableConfig().getBigtableService(options);
- }
- }, getTableId(), getRowFilter(), getKeyRanges(), null);
+ new BigtableSource(getBigtableConfig(), getRowFilter(),
getKeyRanges(), null);
return
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}
@@ -476,7 +506,7 @@ public class BigtableIO {
static Write create() {
BigtableConfig config = BigtableConfig.builder()
- .setTableId("")
+ .setTableId(ValueProvider.StaticValueProvider.of(""))
.setValidate(true)
.setBigtableOptionsConfigurator(enableBulkApiConfigurator(null))
.build();
@@ -494,32 +524,73 @@ public class BigtableIO {
/**
* Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable project
- * indicated by given parameter, requires {@link #withInstanceId(String)}
+ * indicated by given parameter, requires {@link #withInstanceId}
* to be called to determine the instance.
*
* <p>Does not modify this object.
*/
- public Write withProjectId(String projectId) {
+ public Write withProjectId(ValueProvider<String> projectId) {
BigtableConfig config = getBigtableConfig();
return
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
}
/**
+ * Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable project
+ * indicated by given parameter, requires {@link #withInstanceId}
+ * to be called to determine the instance.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withProjectId(String projectId) {
+ return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+ }
+
+ /**
* Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable instance
- * indicated by given parameter, requires {@link #withProjectId(String)}
to be called to
+ * indicated by given parameter, requires {@link #withProjectId} to be
called to
* determine the project.
*
* <p>Does not modify this object.
*/
- public Write withInstanceId(String instanceId) {
+ public Write withInstanceId(ValueProvider<String> instanceId) {
BigtableConfig config = getBigtableConfig();
return
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
}
/**
+ * Returns a new {@link BigtableIO.Read} that will write into the Cloud
Bigtable instance
+ * indicated by given parameter, requires {@link #withProjectId} to be
called to
+ * determine the project.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withInstanceId(String instanceId) {
+ return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write to the specified
table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableId(ValueProvider<String> tableId) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write to the specified
table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableId(String tableId) {
+ return withTableId(ValueProvider.StaticValueProvider.of(tableId));
+ }
+
+ /**
* WARNING: Should be used only to specify additional parameters for
connection to
* the Cloud Bigtable, instanceId and projectId should be provided over
- * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ * {@link #withInstanceId} and {@link #withProjectId} respectively.
*
* <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
@@ -537,7 +608,7 @@ public class BigtableIO {
/**
* WARNING: Should be used only to specify additional parameters for
connection
* to the Cloud Bigtable, instanceId and projectId should be provided over
- * {@link #withInstanceId(String)} and {@link #withProjectId(String)}
respectively.
+ * {@link #withInstanceId} and {@link #withProjectId} respectively.
*
* <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud
Bigtable instance
* indicated by the given options, and using any other specified
customizations.
@@ -563,7 +634,7 @@ public class BigtableIO {
* with customized options provided by given configurator.
*
* <p>WARNING: instanceId and projectId should not be provided here and
should be provided over
- * {@link #withProjectId(String)} and {@link #withInstanceId(String)}.
+ * {@link #withProjectId} and {@link #withInstanceId}.
*
* <p>Does not modify this object.
*/
@@ -583,16 +654,6 @@ public class BigtableIO {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write to the specified
table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableId(String tableId) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
- }
-
- /**
* Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable
* service implementation.
*
@@ -609,13 +670,7 @@ public class BigtableIO {
public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input)
{
getBigtableConfig().validate();
- input.apply(ParDo.of(new
BigtableWriterFn(getBigtableConfig().getTableId(),
- new SerializableFunction<PipelineOptions, BigtableService>() {
- @Override
- public BigtableService apply(PipelineOptions options) {
- return getBigtableConfig().getBigtableService(options);
- }
- })));
+ input.apply(ParDo.of(new BigtableWriterFn(getBigtableConfig())));
return PDone.in(input.getPipeline());
}
@@ -639,19 +694,16 @@ public class BigtableIO {
private class BigtableWriterFn extends DoFn<KV<ByteString,
Iterable<Mutation>>, Void> {
- public BigtableWriterFn(String tableId,
- SerializableFunction<PipelineOptions, BigtableService>
bigtableServiceFactory) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableServiceFactory =
- checkNotNull(bigtableServiceFactory, "bigtableServiceFactory");
+ public BigtableWriterFn(BigtableConfig bigtableConfig) {
+ this.config = bigtableConfig;
this.failures = new ConcurrentLinkedQueue<>();
}
@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
if (bigtableWriter == null) {
- bigtableWriter = bigtableServiceFactory.apply(
- c.getPipelineOptions()).openForWriting(tableId);
+ bigtableWriter = config.getBigtableService(
+
c.getPipelineOptions()).openForWriting(config.getTableId().get());
}
recordsWritten = 0;
}
@@ -687,8 +739,7 @@ public class BigtableIO {
}
///////////////////////////////////////////////////////////////////////////////
- private final String tableId;
- private final SerializableFunction<PipelineOptions, BigtableService>
bigtableServiceFactory;
+ private final BigtableConfig config;
private BigtableService.Writer bigtableWriter;
private long recordsWritten;
private final ConcurrentLinkedQueue<BigtableWriteException> failures;
@@ -756,13 +807,11 @@ public class BigtableIO {
static class BigtableSource extends BoundedSource<Row> {
public BigtableSource(
- SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
- String tableId,
+ BigtableConfig config,
@Nullable RowFilter filter,
List<ByteKeyRange> ranges,
@Nullable Long estimatedSizeBytes) {
- this.serviceFactory = serviceFactory;
- this.tableId = tableId;
+ this.config = config;
this.filter = filter;
this.ranges = ranges;
this.estimatedSizeBytes = estimatedSizeBytes;
@@ -771,7 +820,7 @@ public class BigtableIO {
@Override
public String toString() {
return MoreObjects.toStringHelper(BigtableSource.class)
- .add("tableId", tableId)
+ .add("config", config)
.add("filter", filter)
.add("ranges", ranges)
.add("estimatedSizeBytes", estimatedSizeBytes)
@@ -779,8 +828,7 @@ public class BigtableIO {
}
////// Private state and internal implementation details //////
- private final SerializableFunction<PipelineOptions, BigtableService>
serviceFactory;
- private final String tableId;
+ private final BigtableConfig config;
@Nullable private final RowFilter filter;
private final List<ByteKeyRange> ranges;
@Nullable private Long estimatedSizeBytes;
@@ -791,13 +839,13 @@ public class BigtableIO {
*/
protected BigtableSource withSingleRange(ByteKeyRange range) {
checkArgument(range != null, "range can not be null");
- return new BigtableSource(serviceFactory, tableId, filter,
+ return new BigtableSource(config, filter,
Arrays.asList(range), estimatedSizeBytes);
}
protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be
null");
- return new BigtableSource(serviceFactory, tableId, filter, ranges,
estimatedSizeBytes);
+ return new BigtableSource(config, filter, ranges, estimatedSizeBytes);
}
/**
@@ -807,7 +855,7 @@ public class BigtableIO {
*/
private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions
pipelineOptions)
throws IOException {
- return serviceFactory.apply(pipelineOptions).getSampleRowKeys(this);
+ return config.getBigtableService(pipelineOptions).getSampleRowKeys(this);
}
@Override
@@ -960,19 +1008,21 @@ public class BigtableIO {
@Override
public BoundedReader<Row> createReader(PipelineOptions options) throws
IOException {
- return new BigtableReader(this, serviceFactory.apply(options));
+ return new BigtableReader(this, config.getBigtableService(options));
}
@Override
public void validate() {
- checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
+ ValueProvider<String> tableId = config.getTableId();
+ checkArgument(tableId != null && tableId.isAccessible() &&
!tableId.get().isEmpty(),
+ "tableId was not supplied");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("tableId", tableId)
+ builder.add(DisplayData.item("tableId", config.getTableId().get())
.withLabel("Table ID"));
if (filter != null) {
@@ -1030,8 +1080,8 @@ public class BigtableIO {
return filter;
}
- public String getTableId() {
- return tableId;
+ public ValueProvider<String> getTableId() {
+ return config.getTableId();
}
}
@@ -1156,8 +1206,8 @@ public class BigtableIO {
}
static void validateTableExists(BigtableConfig config, PipelineOptions
options) {
- if (config.getValidate()) {
- String tableId = config.getTableId();
+ if (config.getValidate() && config.isDataAccessible()) {
+ String tableId = checkNotNull(config.getTableId().get());
try {
checkArgument(
config.getBigtableService(options).tableExists(tableId),
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 06c459b..af756a8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -119,7 +119,7 @@ class BigtableServiceImpl implements BigtableService {
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder()
.setRows(rowSet)
-
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId()));
+
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId().get()));
if (source.getRowFilter() != null) {
requestB.setFilter(source.getRowFilter());
}
@@ -249,7 +249,7 @@ class BigtableServiceImpl implements BigtableService {
try (BigtableSession session = new BigtableSession(options)) {
SampleRowKeysRequest request =
SampleRowKeysRequest.newBuilder()
-
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId()))
+
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId().get()))
.build();
return session.getDataClient().sampleRowKeys(request);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
new file mode 100644
index 0000000..0f44f8c
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static org.hamcrest.Matchers.allOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link BigtableConfig}.
+ */
+@RunWith(JUnit4.class)
+public class BigtableConfigTest {
+
+ static final ValueProvider<String> NOT_ACCESSIBLE_VALUE = new
ValueProvider<String>() {
+ @Override
+ public String get() {
+ throw new IllegalStateException("Value is not accessible");
+ }
+
+ @Override
+ public boolean isAccessible() {
+ return false;
+ }
+ };
+
+ static final ValueProvider<String> PROJECT_ID =
+ ValueProvider.StaticValueProvider.of("project_id");
+
+ static final ValueProvider<String> INSTANCE_ID =
+ ValueProvider.StaticValueProvider.of("instance_id");
+
+ static final ValueProvider<String> TABLE_ID =
ValueProvider.StaticValueProvider.of("table");
+
+ static final
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
CONFIGURATOR =
+ new SerializableFunction<BigtableOptions.Builder,
BigtableOptions.Builder>() {
+ @Override
+ public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+ return input;
+ }
+ };
+
+ static final BigtableService SERVICE = Mockito.mock(BigtableService.class);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private BigtableConfig config;
+
+ @Before
+ public void setup() throws Exception {
+ config = BigtableConfig.builder().setValidate(false).build();
+ }
+
+ @Test
+ public void testWithProjectId() {
+ assertEquals(PROJECT_ID.get(),
config.withProjectId(PROJECT_ID).getProjectId().get());
+
+ thrown.expect(IllegalArgumentException.class);
+ config.withProjectId(null);
+ }
+
+ @Test
+ public void testWithInstanceId() {
+ assertEquals(INSTANCE_ID.get(),
config.withInstanceId(INSTANCE_ID).getInstanceId().get());
+
+ thrown.expect(IllegalArgumentException.class);
+ config.withInstanceId(null);
+ }
+
+ @Test
+ public void testWithTableId() {
+ assertEquals(TABLE_ID.get(),
config.withTableId(TABLE_ID).getTableId().get());
+
+ thrown.expect(IllegalArgumentException.class);
+ config.withTableId(null);
+ }
+
+ @Test
+ public void testWithBigtableOptionsConfigurator() {
+ assertEquals(CONFIGURATOR,
+
config.withBigtableOptionsConfigurator(CONFIGURATOR).getBigtableOptionsConfigurator());
+
+ thrown.expect(IllegalArgumentException.class);
+ config.withBigtableOptionsConfigurator(null);
+ }
+
+ @Test
+ public void testWithValidate() {
+ assertEquals(true, config.withValidate(true).getValidate());
+ }
+
+ @Test
+ public void testWithBigtableService() {
+ assertEquals(SERVICE,
config.withBigtableService(SERVICE).getBigtableService());
+
+ thrown.expect(IllegalArgumentException.class);
+ config.withBigtableService(null);
+ }
+
+ @Test
+ public void testValidate() {
+ config.withProjectId(PROJECT_ID)
+ .withInstanceId(INSTANCE_ID)
+ .withTableId(TABLE_ID)
+ .validate();
+ }
+
+ @Test
+ public void testValidateFailsWithoutProjectId() {
+ config.withInstanceId(INSTANCE_ID)
+ .withTableId(TABLE_ID);
+
+ thrown.expect(IllegalArgumentException.class);
+ config.validate();
+ }
+
+ @Test
+ public void testValidateFailsWithoutInstanceId() {
+ config.withProjectId(PROJECT_ID)
+ .withTableId(TABLE_ID);
+
+ thrown.expect(IllegalArgumentException.class);
+ config.validate();
+ }
+
+ @Test
+ public void testValidateFailsWithoutTableId() {
+ config.withProjectId(PROJECT_ID)
+ .withInstanceId(INSTANCE_ID);
+
+ thrown.expect(IllegalArgumentException.class);
+ config.validate();
+ }
+
+ @Test
+ public void testPopulateDisplayData() {
+ DisplayData displayData = DisplayData.from(new HasDisplayData() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ config.withProjectId(PROJECT_ID)
+ .withInstanceId(INSTANCE_ID)
+ .withTableId(TABLE_ID)
+ .populateDisplayData(builder);
+ }
+ });
+
+ assertThat(displayData, hasDisplayItem(allOf(
+ hasKey("projectId"),
+ hasLabel("Bigtable Project Id"),
+ hasValue(PROJECT_ID.get()))));
+
+ assertThat(displayData, hasDisplayItem(allOf(
+ hasKey("instanceId"),
+ hasLabel("Bigtable Instance Id"),
+ hasValue(INSTANCE_ID.get()))));
+
+ assertThat(displayData, hasDisplayItem(allOf(
+ hasKey("tableId"),
+ hasLabel("Bigtable Table Id"),
+ hasValue(TABLE_ID.get()))));
+ }
+
+ @Test
+ public void testGetBigtableServiceWithDefaultService() {
+ assertEquals(SERVICE,
config.withBigtableService(SERVICE).getBigtableService());
+ }
+
+ @Test
+ public void testGetBigtableServiceWithConfigurator() {
+ SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
configurator =
+ new SerializableFunction<BigtableOptions.Builder,
BigtableOptions.Builder>() {
+ @Override
+ public BigtableOptions.Builder apply(BigtableOptions.Builder input) {
+ return input
+ .setInstanceId(INSTANCE_ID.get() + INSTANCE_ID.get())
+ .setProjectId(PROJECT_ID.get() + PROJECT_ID.get())
+ .setBulkOptions(new
BulkOptions.Builder().setUseBulkApi(true).build());
+ }
+ };
+
+ BigtableService service = config
+ .withProjectId(PROJECT_ID)
+ .withInstanceId(INSTANCE_ID)
+ .withBigtableOptionsConfigurator(configurator)
+ .getBigtableService(PipelineOptionsFactory.as(GcpOptions.class));
+
+ assertEquals(PROJECT_ID.get(),
service.getBigtableOptions().getProjectId());
+ assertEquals(INSTANCE_ID.get(),
service.getBigtableOptions().getInstanceId());
+ assertEquals(true,
service.getBigtableOptions().getBulkOptions().useBulkApi());
+ }
+
+ @Test
+ public void testIsDataAccessible() {
+
assertTrue(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID).withInstanceId(INSTANCE_ID)
+ .isDataAccessible());
+ assertTrue(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID)
+ .withBigtableOptions(new
BigtableOptions.Builder().setInstanceId("instance_id").build())
+ .isDataAccessible());
+ assertTrue(config.withTableId(TABLE_ID).withInstanceId(INSTANCE_ID)
+ .withBigtableOptions(new
BigtableOptions.Builder().setProjectId("project_id").build())
+ .isDataAccessible());
+ assertTrue(config.withTableId(TABLE_ID).withBigtableOptions(
+ new
BigtableOptions.Builder().setProjectId("project_id").setInstanceId("instance_id").build())
+ .isDataAccessible());
+
+
assertFalse(config.withTableId(NOT_ACCESSIBLE_VALUE).withProjectId(PROJECT_ID)
+ .withInstanceId(INSTANCE_ID).isDataAccessible());
+
assertFalse(config.withTableId(TABLE_ID).withProjectId(NOT_ACCESSIBLE_VALUE)
+ .withInstanceId(INSTANCE_ID).isDataAccessible());
+ assertFalse(config.withTableId(TABLE_ID).withProjectId(PROJECT_ID)
+ .withInstanceId(NOT_ACCESSIBLE_VALUE).isDataAccessible());
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 4b878a4..fe894ca 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -86,8 +86,8 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -115,14 +115,20 @@ public class BigtableIOTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
- private static FakeBigtableService service;
- private static SerializableFunction<PipelineOptions, BigtableService>
serviceFactory =
- new SerializableFunction<PipelineOptions, BigtableService>() {
- @Override
- public BigtableService apply(PipelineOptions input) {
- return service;
- }
+ static final ValueProvider<String> NOT_ACCESSIBLE_VALUE = new
ValueProvider<String>() {
+ @Override
+ public String get() {
+ throw new IllegalStateException("Value is not accessible");
+ }
+
+ @Override
+ public boolean isAccessible() {
+ return false;
+ }
};
+
+ private static BigtableConfig config;
+ private static FakeBigtableService service;
private static final BigtableOptions BIGTABLE_OPTIONS =
new BigtableOptions.Builder()
.setProjectId("options_project")
@@ -151,6 +157,11 @@ public class BigtableIOTest {
defaultRead = defaultRead.withBigtableService(service);
defaultWrite = defaultWrite.withBigtableService(service);
bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+
+ config = BigtableConfig.builder()
+ .setValidate(true)
+ .setBigtableService(service)
+ .build();
}
private static ByteKey makeByteKey(ByteString key) {
@@ -167,8 +178,8 @@ public class BigtableIOTest {
.withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("options_project", read.getBigtableOptions().getProjectId());
assertEquals("options_instance",
read.getBigtableOptions().getInstanceId());
- assertEquals("instance", read.getBigtableConfig().getInstanceId());
- assertEquals("project", read.getBigtableConfig().getProjectId());
+ assertEquals("instance", read.getBigtableConfig().getInstanceId().get());
+ assertEquals("project", read.getBigtableConfig().getProjectId().get());
assertEquals("table", read.getTableId());
assertEquals(PORT_CONFIGURATOR,
read.getBigtableConfig().getBigtableOptionsConfigurator());
}
@@ -178,7 +189,6 @@ public class BigtableIOTest {
BigtableIO.Read read =
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
thrown.expect(IllegalArgumentException.class);
-
read.expand(null);
}
@@ -222,11 +232,11 @@ public class BigtableIOTest {
.withTableId("table")
.withInstanceId("instance")
.withProjectId("project");
- assertEquals("table", write.getBigtableConfig().getTableId());
+ assertEquals("table", write.getBigtableConfig().getTableId().get());
assertEquals("options_project", write.getBigtableOptions().getProjectId());
assertEquals("options_instance",
write.getBigtableOptions().getInstanceId());
- assertEquals("instance", write.getBigtableConfig().getInstanceId());
- assertEquals("project", write.getBigtableConfig().getProjectId());
+ assertEquals("instance", write.getBigtableConfig().getInstanceId().get());
+ assertEquals("project", write.getBigtableConfig().getProjectId().get());
}
@Test
@@ -532,12 +542,8 @@ public class BigtableIOTest {
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
- new BigtableSource(
- serviceFactory,
- table,
- null,
- Arrays.asList(service.getTableRange(table)),
- null);
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null,
+ Arrays.asList(service.getTableRange(table)), null);
assertSplitAtFractionExhaustive(source, null);
}
@@ -554,11 +560,8 @@ public class BigtableIOTest {
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
- new BigtableSource(serviceFactory,
- table,
- null,
- Arrays.asList(service.getTableRange(table)),
- null);
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+ null, Arrays.asList(service.getTableRange(table)), null);
// With 0 items read, all split requests will fail.
assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
@@ -588,8 +591,7 @@ public class BigtableIOTest {
// Generate source and split it.
BigtableSource source =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
@@ -626,14 +628,12 @@ public class BigtableIOTest {
tableRange.withStartKey(splitKey2));
// Generate source and split it.
BigtableSource source =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
BigtableSource referenceSource =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
ImmutableList.of(service.getTableRange(table)),
null /*size*/);
@@ -660,8 +660,7 @@ public class BigtableIOTest {
// Generate source and split it.
BigtableSource source =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
@@ -702,14 +701,12 @@ public class BigtableIOTest {
tableRange.withStartKey(splitKey2));
// Generate source and split it.
BigtableSource source =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
BigtableSource referenceSource =
- new BigtableSource(serviceFactory,
- table,
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
ImmutableList.of(service.getTableRange(table)),
null /*size*/);
@@ -737,12 +734,8 @@ public class BigtableIOTest {
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
BigtableSource source =
- new BigtableSource(
- serviceFactory,
- table,
- filter,
- Arrays.asList(ByteKeyRange.ALL_KEYS),
- null /*size*/);
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+ filter, Arrays.asList(ByteKeyRange.ALL_KEYS), null /*size*/);
List<BigtableSource> splits = source.split(numRows * bytesPerRow /
numSplits, null);
// Test num splits and split equality.
@@ -767,7 +760,7 @@ public class BigtableIOTest {
assertThat(displayData, hasDisplayItem(allOf(
hasKey("tableId"),
- hasLabel("Table ID"),
+ hasLabel("Bigtable Table Id"),
hasValue("fooTable"))));
assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
@@ -867,6 +860,18 @@ public class BigtableIOTest {
p.run();
}
+ /** Tests that when writing to a non-existent table, the write fails. */
+ @Test
+ public void testTableCheckIgnoredWhenCanNotAccessConfig() throws Exception {
+ PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+ p.apply(
+ Create.empty(
+ KvCoder.of(ByteStringCoder.of(),
IterableCoder.of(ProtoCoder.of(Mutation.class)))));
+
+ emptyInput.apply("write", defaultWrite.withTableId(NOT_ACCESSIBLE_VALUE));
+ p.run();
+ }
+
/** Tests that when writing an element fails, the write fails. */
@Test
public void testWritingFailsBadElement() throws Exception {
@@ -903,7 +908,8 @@ public class BigtableIOTest {
makeTableData(table, numRows);
BigtableSource source =
- new BigtableSource(serviceFactory, table, null,
Arrays.asList(ByteKeyRange.ALL_KEYS), null);
+ new
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+ null, Arrays.asList(ByteKeyRange.ALL_KEYS), null);
BoundedReader<Row> reader =
source.createReader(TestPipeline.testingPipelineOptions());
@@ -1058,8 +1064,8 @@ public class BigtableIOTest {
@Override
public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source)
{
- List<SampleRowKeysResponse> samples =
sampleRowKeys.get(source.getTableId());
- checkNotNull(samples, "No samples found for table %s",
source.getTableId());
+ List<SampleRowKeysResponse> samples =
sampleRowKeys.get(source.getTableId().get());
+ checkNotNull(samples, "No samples found for table %s",
source.getTableId().get());
return samples;
}
@@ -1114,12 +1120,12 @@ public class BigtableIOTest {
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is
supported");
filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
}
- service.verifyTableExists(source.getTableId());
+ service.verifyTableExists(source.getTableId().get());
}
@Override
public boolean start() {
- rows = service.tables.get(source.getTableId()).entrySet().iterator();
+ rows =
service.tables.get(source.getTableId().get()).entrySet().iterator();
return advance();
}
--
To stop receiving notification emails like this one, please contact
[email protected].