Repository: beam Updated Branches: refs/heads/master 13c06bf79 -> 42066431b
BigtableIO should use AutoValue for read and write Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8d82981 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8d82981 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8d82981 Branch: refs/heads/master Commit: e8d829817e3f7e07884a91737b61bfa5133d5724 Parents: 13c06bf Author: Borisa Zivkovic <[email protected]> Authored: Fri May 12 08:44:20 2017 +0100 Committer: Eugene Kirpichov <[email protected]> Committed: Fri May 12 10:56:41 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 237 +++++++++---------- 1 file changed, 114 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e8d82981/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index bde7ea5..22e9f36 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Row; @@ -164,7 +165,8 @@ public class BigtableIO { */ @Experimental public static Read read() { - return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null); + return new AutoValue_BigtableIO_Read.Builder().setKeyRange(ByteKeyRange.ALL_KEYS).setTableId("") + .build(); } /** @@ -176,7 +178,7 @@ public class BigtableIO { */ @Experimental public static Write write() { - return new Write(null, "", null); + return new AutoValue_BigtableIO_Write.Builder().setTableId("").build(); } /** @@ -186,7 +188,46 @@ public class BigtableIO { * @see BigtableIO */ @Experimental - public static class Read extends PTransform<PBegin, PCollection<Row>> { + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<Row>> { + + @Nullable + abstract RowFilter getRowFilter(); + + /** Returns the range of keys that will be read from the table. */ + @Nullable + public abstract ByteKeyRange getKeyRange(); + + /** Returns the table being read from. */ + @Nullable + public abstract String getTableId(); + + @Nullable + abstract BigtableService getBigtableService(); + + + /** Returns the Google Cloud Bigtable instance being read from, and other parameters. */ + @Nullable + public abstract BigtableOptions getBigtableOptions(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setRowFilter(RowFilter filter); + + abstract Builder setKeyRange(ByteKeyRange keyRange); + + abstract Builder setTableId(String tableId); + + abstract Builder setBigtableOptions(BigtableOptions options); + + abstract Builder setBigtableService(BigtableService bigtableService); + + abstract Read build(); + } + /** * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance * indicated by the given options, and using any other specified customizations. @@ -217,7 +258,7 @@ public class BigtableIO { BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); - return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService); + return toBuilder().setBigtableOptions(optionsWithAgent).build(); } /** @@ -228,7 +269,7 @@ public class BigtableIO { */ public Read withRowFilter(RowFilter filter) { checkNotNull(filter, "filter"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return toBuilder().setRowFilter(filter).build(); } /** @@ -238,7 +279,7 @@ public class BigtableIO { */ public Read withKeyRange(ByteKeyRange keyRange) { checkNotNull(keyRange, "keyRange"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return toBuilder().setKeyRange(keyRange).build(); } /** @@ -248,29 +289,7 @@ public class BigtableIO { */ public Read withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Read(options, tableId, keyRange, filter, bigtableService); - } - - /** - * Returns the Google Cloud Bigtable instance being read from, and other parameters. - */ - public BigtableOptions getBigtableOptions() { - return options; - } - - /** - * Returns the range of keys that will be read from the table. By default, returns - * {@link ByteKeyRange#ALL_KEYS} to scan the entire table. - */ - public ByteKeyRange getKeyRange() { - return keyRange; - } - - /** - * Returns the table being read from. - */ - public String getTableId() { - return tableId; + return toBuilder().setTableId(tableId).build(); } @Override @@ -281,21 +300,21 @@ public class BigtableIO { public BigtableService apply(PipelineOptions options) { return getBigtableService(options); } - }, tableId, filter, keyRange, null); + }, getTableId(), getRowFilter(), getKeyRange(), null); return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); } @Override public void validate(PipelineOptions options) { - checkArgument(this.options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); + checkArgument(getBigtableOptions() != null, "BigtableOptions not specified"); + checkArgument(getTableId() != null && !getTableId().isEmpty(), "Table ID not specified"); try { checkArgument( - getBigtableService(options).tableExists(tableId), + getBigtableService(options).tableExists(getTableId()), "Table %s does not exist", - tableId); + getTableId()); } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e); } } @@ -303,19 +322,19 @@ public class BigtableIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("tableId", tableId) + builder.add(DisplayData.item("tableId", getTableId()) .withLabel("Table ID")); - if (options != null) { - builder.add(DisplayData.item("bigtableOptions", options.toString()) + if (getBigtableOptions() != null) { + builder.add(DisplayData.item("bigtableOptions", getBigtableOptions().toString()) .withLabel("Bigtable Options")); } builder.addIfNotDefault( - DisplayData.item("keyRange", keyRange.toString()), ByteKeyRange.ALL_KEYS.toString()); + DisplayData.item("keyRange", getKeyRange().toString()), ByteKeyRange.ALL_KEYS.toString()); - if (filter != null) { - builder.add(DisplayData.item("rowFilter", filter.toString()) + if (getRowFilter() != null) { + builder.add(DisplayData.item("rowFilter", getRowFilter().toString()) .withLabel("Table Row Filter")); } } @@ -323,38 +342,13 @@ public class BigtableIO { @Override public String toString() { return MoreObjects.toStringHelper(Read.class) - .add("options", options) - .add("tableId", tableId) - .add("keyRange", keyRange) - .add("filter", filter) + .add("options", getBigtableOptions()) + .add("tableId", getTableId()) + .add("keyRange", getKeyRange()) + .add("filter", getRowFilter()) .toString(); } - ///////////////////////////////////////////////////////////////////////////////////////// - /** - * Used to define the Cloud Bigtable instance and any options for the networking layer. - * Cannot actually be {@code null} at validation time, but may start out {@code null} while - * source is being built. - */ - @Nullable private final BigtableOptions options; - private final String tableId; - private final ByteKeyRange keyRange; - @Nullable private final RowFilter filter; - @Nullable private final BigtableService bigtableService; - - private Read( - @Nullable BigtableOptions options, - String tableId, - ByteKeyRange keyRange, - @Nullable RowFilter filter, - @Nullable BigtableService bigtableService) { - this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); - this.keyRange = checkNotNull(keyRange, "keyRange"); - this.filter = filter; - this.bigtableService = bigtableService; - } - /** * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable * service implementation. @@ -365,7 +359,7 @@ public class BigtableIO { */ Read withBigtableService(BigtableService bigtableService) { checkNotNull(bigtableService, "bigtableService"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return toBuilder().setBigtableService(bigtableService).build(); } /** @@ -378,11 +372,12 @@ public class BigtableIO { */ @VisibleForTesting BigtableService getBigtableService(PipelineOptions pipelineOptions) { - if (bigtableService != null) { - return bigtableService; + if (getBigtableService() != null) { + return getBigtableService(); } - BigtableOptions.Builder clonedOptions = options.toBuilder(); - if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) { + BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder(); + if (getBigtableOptions().getCredentialOptions() + .getCredentialType() == CredentialType.DefaultCredentials) { clonedOptions.setCredentialOptions( CredentialOptions.credential( pipelineOptions.as(GcpOptions.class).getGcpCredential())); @@ -398,24 +393,33 @@ public class BigtableIO { * @see BigtableIO */ @Experimental - public static class Write + @AutoValue + public abstract static class Write extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { - /** - * Used to define the Cloud Bigtable instance and any options for the networking layer. - * Cannot actually be {@code null} at validation time, but may start out {@code null} while - * source is being built. - */ - @Nullable private final BigtableOptions options; - private final String tableId; - @Nullable private final BigtableService bigtableService; - private Write( - @Nullable BigtableOptions options, - String tableId, - @Nullable BigtableService bigtableService) { - this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); - this.bigtableService = bigtableService; + /** Returns the table being written to. */ + @Nullable + abstract String getTableId(); + + @Nullable + abstract BigtableService getBigtableService(); + + /** Returns the Google Cloud Bigtable instance being written to, and other parameters. */ + @Nullable + public abstract BigtableOptions getBigtableOptions(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setTableId(String tableId); + + abstract Builder setBigtableOptions(BigtableOptions options); + + abstract Builder setBigtableService(BigtableService bigtableService); + + abstract Write build(); } /** @@ -452,7 +456,7 @@ public class BigtableIO { .setUseCachedDataPool(true); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); - return new Write(optionsWithAgent, tableId, bigtableService); + return toBuilder().setBigtableOptions(optionsWithAgent).build(); } /** @@ -462,26 +466,12 @@ public class BigtableIO { */ public Write withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Write(options, tableId, bigtableService); - } - - /** - * Returns the Google Cloud Bigtable instance being written to, and other parameters. - */ - public BigtableOptions getBigtableOptions() { - return options; - } - - /** - * Returns the table being written to. - */ - public String getTableId() { - return tableId; + return toBuilder().setTableId(tableId).build(); } @Override public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) { - input.apply(ParDo.of(new BigtableWriterFn(tableId, + input.apply(ParDo.of(new BigtableWriterFn(getTableId(), new SerializableFunction<PipelineOptions, BigtableService>() { @Override public BigtableService apply(PipelineOptions options) { @@ -493,15 +483,15 @@ public class BigtableIO { @Override public void validate(PipelineOptions options) { - checkArgument(this.options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); + checkArgument(getBigtableOptions() != null, "BigtableOptions not specified"); + checkArgument(getTableId() != null && !getTableId().isEmpty(), "Table ID not specified"); try { checkArgument( - getBigtableService(options).tableExists(tableId), + getBigtableService(options).tableExists(getTableId()), "Table %s does not exist", - tableId); + getTableId()); } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e); } } @@ -515,18 +505,18 @@ public class BigtableIO { */ Write withBigtableService(BigtableService bigtableService) { checkNotNull(bigtableService, "bigtableService"); - return new Write(options, tableId, bigtableService); + return toBuilder().setBigtableService(bigtableService).build(); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("tableId", tableId) + builder.add(DisplayData.item("tableId", getTableId()) .withLabel("Table ID")); - if (options != null) { - builder.add(DisplayData.item("bigtableOptions", options.toString()) + if (getBigtableOptions() != null) { + builder.add(DisplayData.item("bigtableOptions", getBigtableOptions().toString()) .withLabel("Bigtable Options")); } } @@ -534,8 +524,8 @@ public class BigtableIO { @Override public String toString() { return MoreObjects.toStringHelper(Write.class) - .add("options", options) - .add("tableId", tableId) + .add("options", getBigtableOptions()) + .add("tableId", getTableId()) .toString(); } @@ -549,11 +539,12 @@ public class BigtableIO { */ @VisibleForTesting BigtableService getBigtableService(PipelineOptions pipelineOptions) { - if (bigtableService != null) { - return bigtableService; + if (getBigtableService() != null) { + return getBigtableService(); } - BigtableOptions.Builder clonedOptions = options.toBuilder(); - if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) { + BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder(); + if (getBigtableOptions().getCredentialOptions() + .getCredentialType() == CredentialType.DefaultCredentials) { clonedOptions.setCredentialOptions( CredentialOptions.credential( pipelineOptions.as(GcpOptions.class).getGcpCredential()));
