igorbernstein2 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1123284678
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -829,56 +1008,95 @@ public final String toString() {
PCollection<KV<ByteString, Iterable<Mutation>>>,
PCollection<BigtableWriteResult>> {
private final BigtableConfig bigtableConfig;
+ private final BigtableWriteOptions bigtableWriteOptions;
- WriteWithResults(BigtableConfig bigtableConfig) {
+ private final BigtableServiceFactory factory;
+
+ WriteWithResults(
+ BigtableConfig bigtableConfig,
+ BigtableWriteOptions bigtableWriteOptions,
+ BigtableServiceFactory factory) {
this.bigtableConfig = bigtableConfig;
+ this.bigtableWriteOptions = bigtableWriteOptions;
+ this.factory = factory;
}
@Override
public PCollection<BigtableWriteResult> expand(
PCollection<KV<ByteString, Iterable<Mutation>>> input) {
bigtableConfig.validate();
+ bigtableWriteOptions.validate();
- return input.apply(ParDo.of(new BigtableWriterFn(bigtableConfig)));
+ return input.apply(
+ ParDo.of(new BigtableWriterFn(factory, bigtableConfig,
bigtableWriteOptions)));
}
@Override
public void validate(PipelineOptions options) {
- validateTableExists(bigtableConfig, options);
+ validateTableExists(bigtableConfig, bigtableWriteOptions, options);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
bigtableConfig.populateDisplayData(builder);
+ bigtableWriteOptions.populateDisplayData(builder);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(WriteWithResults.class)
.add("config", bigtableConfig)
+ .add("writeOptions", bigtableWriteOptions)
.toString();
}
+
+ private void validateTableExists(
+ BigtableConfig config, BigtableWriteOptions writeOptions,
PipelineOptions options) {
+ if (config.getValidate() && config.isDataAccessible() &&
writeOptions.isDataAccessible()) {
+ String tableId = checkNotNull(writeOptions.getTableId().get());
+ try {
+ checkArgument(
+ factory.checkTableExists(config, options,
writeOptions.getTableId().get()),
+ "Table %s does not exist",
+ tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.",
tableId, e);
+ }
+ }
+ }
}
private static class BigtableWriterFn
extends DoFn<KV<ByteString, Iterable<Mutation>>, BigtableWriteResult> {
- BigtableWriterFn(BigtableConfig bigtableConfig) {
+ private final BigtableServiceFactory factory;
+ private final BigtableServiceFactory.ConfigId id;
+
+ // Assign serviceEntry in startBundle and clear it in tearDown.
+ @Nullable private BigtableServiceEntry serviceEntry;
+
+ BigtableWriterFn(
+ BigtableServiceFactory factory,
+ BigtableConfig bigtableConfig,
+ BigtableWriteOptions writeOptions) {
+ this.factory = factory;
this.config = bigtableConfig;
+ this.writeOptions = writeOptions;
this.failures = new ConcurrentLinkedQueue<>();
+ this.id = factory.newId();
}
@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
- if (bigtableWriter == null) {
- bigtableWriter =
- config
- .getBigtableService(c.getPipelineOptions())
- .openForWriting(config.getTableId().get());
- }
recordsWritten = 0;
this.seenWindows = Maps.newHashMapWithExpectedSize(1);
+
+ if (bigtableWriter == null) {
+ serviceEntry =
+ factory.getServiceForWriting(id, config, writeOptions,
c.getPipelineOptions());
+ bigtableWriter =
serviceEntry.getService().openForWriting(writeOptions.getTableId().get());
+ }
}
@ProcessElement
Review Comment:
This DoFn has a bunch of additional logic for checking for failures, in a
future PR we should just use the error tracking veneer's batching logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]