igorbernstein2 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1110319032
##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java:
##########
@@ -20,49 +20,45 @@
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteString;
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auth.Credentials;
-import com.google.bigtable.admin.v2.ColumnFamily;
-import com.google.bigtable.admin.v2.DeleteTableRequest;
-import com.google.bigtable.admin.v2.Table;
-import com.google.bigtable.v2.MutateRowRequest;
-import com.google.bigtable.v2.Mutation;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.config.CredentialOptions;
-import com.google.cloud.bigtable.grpc.BigtableDataClient;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.Nullable;
class BigtableClientWrapper implements Serializable {
private final BigtableTableAdminClient tableAdminClient;
private final BigtableDataClient dataClient;
- private final BigtableSession session;
- private final BigtableOptions bigtableOptions;
- BigtableClientWrapper(
+ public BigtableClientWrapper(
Review Comment:
why did this need to be become public?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -197,81 +186,19 @@ void populateDisplayData(DisplayData.Builder builder) {
}
}
- /**
- * Helper function that either returns the mock Bigtable service supplied by
{@link
- * #withBigtableService} or creates and returns an implementation that talks
to {@code Cloud
- * Bigtable}.
- *
- * <p>Also populate the credentials option from {@link
GcpOptions#getGcpCredential()} if the
- * default credentials are being used on {@link BigtableOptions}.
- */
- @VisibleForTesting
- BigtableService getBigtableService(PipelineOptions pipelineOptions) {
- if (getBigtableService() != null) {
- return getBigtableService();
- }
-
- BigtableOptions.Builder bigtableOptions =
effectiveUserProvidedBigtableOptions();
-
- bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
-
- if (bigtableOptions.build().getCredentialOptions().getCredentialType()
- == CredentialOptions.CredentialType.DefaultCredentials) {
- bigtableOptions.setCredentialOptions(
-
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
- }
-
- return new BigtableServiceImpl(bigtableOptions.build());
- }
-
boolean isDataAccessible() {
- return getTableId().isAccessible()
- && (getProjectId() == null || getProjectId().isAccessible())
+ return (getProjectId() == null || getProjectId().isAccessible())
&& (getInstanceId() == null || getInstanceId().isAccessible());
Review Comment:
app profile id?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigTranslator {
+
+ /** Translate BigtableConfig and BigtableReadOptions to Veneer settings. */
+ static BigtableDataSettings translateReadToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableReadOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureReadSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateWriteToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableWriteOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureWriteSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateToVeneerSettings(
+ @Nonnull BigtableConfig config, @Nonnull PipelineOptions
pipelineOptions) {
+
+ return buildBigtableDataSettings(config, pipelineOptions).build();
+ }
+
+ private static BigtableDataSettings.Builder buildBigtableDataSettings(
+ BigtableConfig config, PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder dataBuilder;
+ if (!Strings.isNullOrEmpty(config.getEmulatorHost())) {
+ String hostAndPort = config.getEmulatorHost();
+ try {
+ int lastIndexOfCol = hostAndPort.lastIndexOf(":");
+ int port = Integer.parseInt(hostAndPort.substring(lastIndexOfCol + 1));
+ dataBuilder =
+ BigtableDataSettings.newBuilderForEmulator(
+ hostAndPort.substring(0, lastIndexOfCol), port);
+ } catch (NumberFormatException | IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Invalid host/port in BigtableConfig " +
hostAndPort);
+ }
+ } else {
+ dataBuilder = BigtableDataSettings.newBuilder();
+ }
+
+ // Configure target
+ dataBuilder
+ .setProjectId(Objects.requireNonNull(config.getProjectId().get()))
+ .setInstanceId(Objects.requireNonNull(config.getInstanceId().get()));
+ if (config.getAppProfileId() != null
+ && !Strings.isNullOrEmpty(config.getAppProfileId().get())) {
+
dataBuilder.setAppProfileId(Objects.requireNonNull(config.getAppProfileId().get()));
+ }
+
+ if (config.getCredentialFactory() != null) {
+ dataBuilder
+ .stubSettings()
+ .setCredentialsProvider(
+ FixedCredentialsProvider.create(
+ ((GcpOptions)
config.getCredentialFactory()).getGcpCredential()));
+ }
+
+ configureHeaderProvider(dataBuilder.stubSettings(), pipelineOptions);
+
+ return dataBuilder;
+ }
+
+ private static void configureHeaderProvider(
+ StubSettings.Builder<?, ?> stubSettings, PipelineOptions
pipelineOptions) {
+
+ ImmutableMap.Builder<String, String> headersBuilder =
ImmutableMap.<String, String>builder();
+ headersBuilder.putAll(stubSettings.getHeaderProvider().getHeaders());
+ headersBuilder.put(
+ GrpcUtil.USER_AGENT_KEY.name(),
Objects.requireNonNull(pipelineOptions.getUserAgent()));
+
+
stubSettings.setHeaderProvider(FixedHeaderProvider.create(headersBuilder.build()));
+ }
+
+ private static BigtableDataSettings configureWriteSettings(
+ BigtableDataSettings.Builder settings, BigtableWriteOptions
writeOptions) {
+ BigtableBatchingCallSettings.Builder callSettings =
+ settings.stubSettings().bulkMutateRowsSettings();
+ RetrySettings.Builder retrySettings =
callSettings.getRetrySettings().toBuilder();
+ BatchingSettings.Builder batchingSettings =
callSettings.getBatchingSettings().toBuilder();
+ if (writeOptions.getAttemptTimeout() != null) {
Review Comment:
Please add a comment to make it easier to read:
// Set the user specified attempt timeout and expand the operation timeout
if user did not explicitly set that
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigTranslator {
+
+ /** Translate BigtableConfig and BigtableReadOptions to Veneer settings. */
+ static BigtableDataSettings translateReadToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableReadOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureReadSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateWriteToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableWriteOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureWriteSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateToVeneerSettings(
+ @Nonnull BigtableConfig config, @Nonnull PipelineOptions
pipelineOptions) {
+
+ return buildBigtableDataSettings(config, pipelineOptions).build();
+ }
+
+ private static BigtableDataSettings.Builder buildBigtableDataSettings(
+ BigtableConfig config, PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder dataBuilder;
+ if (!Strings.isNullOrEmpty(config.getEmulatorHost())) {
+ String hostAndPort = config.getEmulatorHost();
+ try {
+ int lastIndexOfCol = hostAndPort.lastIndexOf(":");
+ int port = Integer.parseInt(hostAndPort.substring(lastIndexOfCol + 1));
+ dataBuilder =
+ BigtableDataSettings.newBuilderForEmulator(
+ hostAndPort.substring(0, lastIndexOfCol), port);
+ } catch (NumberFormatException | IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Invalid host/port in BigtableConfig " +
hostAndPort);
+ }
+ } else {
+ dataBuilder = BigtableDataSettings.newBuilder();
+ }
+
+ // Configure target
+ dataBuilder
+ .setProjectId(Objects.requireNonNull(config.getProjectId().get()))
+ .setInstanceId(Objects.requireNonNull(config.getInstanceId().get()));
+ if (config.getAppProfileId() != null
+ && !Strings.isNullOrEmpty(config.getAppProfileId().get())) {
+
dataBuilder.setAppProfileId(Objects.requireNonNull(config.getAppProfileId().get()));
+ }
+
+ if (config.getCredentialFactory() != null) {
+ dataBuilder
+ .stubSettings()
+ .setCredentialsProvider(
+ FixedCredentialsProvider.create(
+ ((GcpOptions)
config.getCredentialFactory()).getGcpCredential()));
+ }
+
+ configureHeaderProvider(dataBuilder.stubSettings(), pipelineOptions);
+
+ return dataBuilder;
+ }
+
+ private static void configureHeaderProvider(
+ StubSettings.Builder<?, ?> stubSettings, PipelineOptions
pipelineOptions) {
+
+ ImmutableMap.Builder<String, String> headersBuilder =
ImmutableMap.<String, String>builder();
+ headersBuilder.putAll(stubSettings.getHeaderProvider().getHeaders());
+ headersBuilder.put(
+ GrpcUtil.USER_AGENT_KEY.name(),
Objects.requireNonNull(pipelineOptions.getUserAgent()));
+
+
stubSettings.setHeaderProvider(FixedHeaderProvider.create(headersBuilder.build()));
+ }
+
+ private static BigtableDataSettings configureWriteSettings(
+ BigtableDataSettings.Builder settings, BigtableWriteOptions
writeOptions) {
+ BigtableBatchingCallSettings.Builder callSettings =
+ settings.stubSettings().bulkMutateRowsSettings();
+ RetrySettings.Builder retrySettings =
callSettings.getRetrySettings().toBuilder();
+ BatchingSettings.Builder batchingSettings =
callSettings.getBatchingSettings().toBuilder();
+ if (writeOptions.getAttemptTimeout() != null) {
+
retrySettings.setInitialRpcTimeout(Duration.ofMillis(writeOptions.getAttemptTimeout()));
+
+ if (writeOptions.getOperationTimeout() == null) {
+ retrySettings.setTotalTimeout(
+ Duration.ofMillis(
+ Math.max(
+ retrySettings.getTotalTimeout().toMillis(),
writeOptions.getAttemptTimeout())));
+ }
+ }
+
+ if (writeOptions.getOperationTimeout() != null) {
+
retrySettings.setTotalTimeout(Duration.ofMillis(writeOptions.getOperationTimeout()));
+ }
+
+ if (writeOptions.getRetryInitialDelay() != null) {
+
retrySettings.setInitialRetryDelay(Duration.ofMillis(writeOptions.getRetryInitialDelay()));
+ }
+
+ if (writeOptions.getRetryDelayMultiplier() != null) {
+
retrySettings.setRetryDelayMultiplier(writeOptions.getRetryDelayMultiplier());
+ }
+
+ if (writeOptions.getBatchElements() != null) {
+
batchingSettings.setElementCountThreshold(writeOptions.getBatchElements());
+ }
+
+ if (writeOptions.getBatchBytes() != null) {
+ batchingSettings.setRequestByteThreshold(writeOptions.getBatchBytes());
+ }
+
+ if (writeOptions.getMaxRequests() != null) {
+ BatchingSettings tmpSettings = batchingSettings.build();
+ batchingSettings =
+ batchingSettings.setFlowControlSettings(
+ callSettings
+ .getBatchingSettings()
+ .getFlowControlSettings()
+ .toBuilder()
+ .setMaxOutstandingElementCount(
+ tmpSettings.getElementCountThreshold() *
writeOptions.getMaxRequests())
+ .setMaxOutstandingRequestBytes(
+ tmpSettings.getRequestByteThreshold() *
writeOptions.getMaxRequests())
+ .build());
+ }
+
+ settings
+ .stubSettings()
+ .bulkMutateRowsSettings()
+ .setRetrySettings(retrySettings.build())
+ .setBatchingSettings(batchingSettings.build());
+
+ return settings.build();
+ }
+
+ private static BigtableDataSettings configureReadSettings(
+ BigtableDataSettings.Builder settings, BigtableReadOptions readOptions) {
+
+ RetrySettings.Builder retrySettings =
+
settings.stubSettings().readRowsSettings().getRetrySettings().toBuilder();
+
+ if (readOptions.getAttemptTimeout() != null) {
+
retrySettings.setInitialRpcTimeout(Duration.ofMillis(readOptions.getAttemptTimeout()));
+
+ if (readOptions.getOperationTimeout() == null) {
+ retrySettings.setTotalTimeout(
+ Duration.ofMillis(
+ Math.max(
+ retrySettings.getTotalTimeout().toMillis(),
readOptions.getAttemptTimeout())));
+ }
+ }
+
+ if (readOptions.getOperationTimeout() != null) {
+
retrySettings.setTotalTimeout(Duration.ofMillis(readOptions.getOperationTimeout()));
+ }
+
+ if (readOptions.getRetryDelayMultiplier() != null) {
+
retrySettings.setRetryDelayMultiplier(readOptions.getRetryDelayMultiplier());
+ }
+
+ if (readOptions.getRetryInitialDelay() != null) {
+
retrySettings.setInitialRetryDelay(Duration.ofMillis(readOptions.getRetryInitialDelay()));
+ }
+
+
settings.stubSettings().readRowsSettings().setRetrySettings(retrySettings.build());
+
+ return settings.build();
+ }
+
+ /** Translate BigtableOptions to BigtableConfig. */
Review Comment:
Please add a note that this meant for backwards compatibility
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -68,12 +63,15 @@ public abstract class BigtableConfig implements
Serializable {
/** Weather validate that table exists before writing. */
abstract boolean getValidate();
- /** {@link BigtableService} used only for testing. */
- abstract @Nullable BigtableService getBigtableService();
-
- /** Bigtable emulator. Used only for testing. */
+ /** Bigtable emulator. */
abstract @Nullable String getEmulatorHost();
+ /** User agent for this job. */
+ abstract @Nullable String getUserAgent();
+
+ /** Credentials for running the job. */
Review Comment:
Might want to mention that this will default the CredentialFactory in
PipelineOptions/Gcpoptions if its null
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -521,28 +563,72 @@ public Read withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
- *
- * <p>Does not modify this object.
*/
@VisibleForTesting
- Read withBigtableService(BigtableService bigtableService) {
+ public Read withEmulator(String emulatorHost) {
BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
}
/**
- * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
+ * Configures the attempt timeout in milliseconds of the reads.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
*/
- @VisibleForTesting
- public Read withEmulator(String emulatorHost) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ public Read withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the operation timeout in milliseconds of the reads.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withOperationTimeout(long timeoutMs) {
Review Comment:
Duration?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -764,35 +894,119 @@ public Write withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
+ */
+ @VisibleForTesting
+ public Write withEmulator(String emulatorHost) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the attempt timeout in
milliseconds for writes.
*
* <p>Does not modify this object.
*/
- Write withBigtableService(BigtableService bigtableService) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ public Write withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
}
/**
- * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
+ * Returns a new {@link BigtableIO.Write} with the operation timeout in
milliseconds for writes.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
+ */
+ public Write withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the retry delay in
milliseconds.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withRetryInitialDelay(long delayMs) {
+ checkArgument(delayMs > 0, "delay must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setRetryInitialDelay(delayMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with retry multiplier.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withRetryDelayMultiplier(double multiplier) {
+ checkArgument(multiplier > 0, "multiplier must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setRetryDelayMultiplier(multiplier).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the number of elements in a
batch.
Review Comment:
I think you need to explain what a batch
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -764,35 +894,119 @@ public Write withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
+ */
+ @VisibleForTesting
Review Comment:
I think this should be available for end users to test their pipelines using
direct runner and the emulator
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -764,35 +894,119 @@ public Write withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
+ */
+ @VisibleForTesting
+ public Write withEmulator(String emulatorHost) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the attempt timeout in
milliseconds for writes.
*
* <p>Does not modify this object.
*/
- Write withBigtableService(BigtableService bigtableService) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ public Write withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
}
/**
- * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
+ * Returns a new {@link BigtableIO.Write} with the operation timeout in
milliseconds for writes.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
+ */
+ public Write withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the retry delay in
milliseconds.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withRetryInitialDelay(long delayMs) {
+ checkArgument(delayMs > 0, "delay must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setRetryInitialDelay(delayMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with retry multiplier.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withRetryDelayMultiplier(double multiplier) {
+ checkArgument(multiplier > 0, "multiplier must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setRetryDelayMultiplier(multiplier).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the number of elements in a
batch.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withBatchElements(long size) {
+ checkArgument(size > 0, "batch element size must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setBatchElements(size).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the number of bytes in a
batch.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withBatchBytes(long size) {
+ checkArgument(size > 0, "batch byte size must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setBatchBytes(size).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the max number of
concurrent requests.
+ *
+ * <p>Does not modify this object.
*/
+ public Write withMaxRequests(long requests) {
Review Comment:
Why did we name this maxRequests? Shouldnt we align with veneer and do max
outstanding elements?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -829,56 +1043,93 @@ public final String toString() {
PCollection<KV<ByteString, Iterable<Mutation>>>,
PCollection<BigtableWriteResult>> {
private final BigtableConfig bigtableConfig;
+ private final BigtableWriteOptions bigtableWriteOptions;
- WriteWithResults(BigtableConfig bigtableConfig) {
+ private final BigtableServiceFactory factory;
+
+ WriteWithResults(
+ BigtableConfig bigtableConfig,
+ BigtableWriteOptions bigtableWriteOptions,
+ BigtableServiceFactory factory) {
this.bigtableConfig = bigtableConfig;
+ this.bigtableWriteOptions = bigtableWriteOptions;
+ this.factory = factory;
}
@Override
public PCollection<BigtableWriteResult> expand(
PCollection<KV<ByteString, Iterable<Mutation>>> input) {
bigtableConfig.validate();
+ bigtableWriteOptions.validate();
- return input.apply(ParDo.of(new BigtableWriterFn(bigtableConfig)));
+ return input.apply(
+ ParDo.of(new BigtableWriterFn(factory, bigtableConfig,
bigtableWriteOptions)));
}
@Override
public void validate(PipelineOptions options) {
- validateTableExists(bigtableConfig, options);
+ validateTableExists(bigtableConfig, bigtableWriteOptions, options);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
bigtableConfig.populateDisplayData(builder);
+ bigtableWriteOptions.populateDisplayData(builder);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(WriteWithResults.class)
.add("config", bigtableConfig)
+ .add("writeOptions", bigtableWriteOptions)
.toString();
}
+
+ private void validateTableExists(
+ BigtableConfig config, BigtableWriteOptions writeOptions,
PipelineOptions options) {
+ if (config.getValidate() && config.isDataAccessible() &&
writeOptions.isDataAccessible()) {
+ String tableId = checkNotNull(writeOptions.getTableId().get());
+ try {
+ checkArgument(
+ factory.checkTableExists(config, options,
writeOptions.getTableId().get()),
+ "Table %s does not exist",
+ tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.",
tableId, e);
+ }
+ }
+ }
}
private static class BigtableWriterFn
extends DoFn<KV<ByteString, Iterable<Mutation>>, BigtableWriteResult> {
- BigtableWriterFn(BigtableConfig bigtableConfig) {
+ private BigtableServiceFactory factory;
+ private BigtableServiceFactory.ConfigId id;
Review Comment:
final?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -764,35 +894,119 @@ public Write withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
+ */
+ @VisibleForTesting
+ public Write withEmulator(String emulatorHost) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the attempt timeout in
milliseconds for writes.
*
* <p>Does not modify this object.
*/
- Write withBigtableService(BigtableService bigtableService) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ public Write withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
}
/**
- * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
+ * Returns a new {@link BigtableIO.Write} with the operation timeout in
milliseconds for writes.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
+ */
+ public Write withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the retry delay in
milliseconds.
Review Comment:
This is the public surface that end users will interact with, please provide
more details what retry delay is and when its used or why anyone cares :)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Configuration for write to Bigtable. */
+@AutoValue
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+abstract class BigtableWriteOptions implements Serializable {
+
+ /** Returns the table id. */
+ abstract @Nullable ValueProvider<String> getTableId();
+
+ /** Returns the attempt timeout for writes. */
+ abstract @Nullable Long getAttemptTimeout();
+
+ /** Returns the operation timeout for writes. */
+ abstract @Nullable Long getOperationTimeout();
Review Comment:
Duration
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -521,28 +563,72 @@ public Read withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
- *
- * <p>Does not modify this object.
*/
@VisibleForTesting
- Read withBigtableService(BigtableService bigtableService) {
+ public Read withEmulator(String emulatorHost) {
BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
}
/**
- * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
+ * Configures the attempt timeout in milliseconds of the reads.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
*/
- @VisibleForTesting
- public Read withEmulator(String emulatorHost) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ public Read withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the operation timeout in milliseconds of the reads.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+
.setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the initial retry delay in milliseconds.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withRetryInitialDelay(long initialDelayMs) {
Review Comment:
Duration?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigTranslator {
+
+ /** Translate BigtableConfig and BigtableReadOptions to Veneer settings. */
+ static BigtableDataSettings translateReadToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableReadOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureReadSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateWriteToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableWriteOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureWriteSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateToVeneerSettings(
+ @Nonnull BigtableConfig config, @Nonnull PipelineOptions
pipelineOptions) {
+
+ return buildBigtableDataSettings(config, pipelineOptions).build();
+ }
+
+ private static BigtableDataSettings.Builder buildBigtableDataSettings(
+ BigtableConfig config, PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder dataBuilder;
+ if (!Strings.isNullOrEmpty(config.getEmulatorHost())) {
+ String hostAndPort = config.getEmulatorHost();
+ try {
+ int lastIndexOfCol = hostAndPort.lastIndexOf(":");
+ int port = Integer.parseInt(hostAndPort.substring(lastIndexOfCol + 1));
+ dataBuilder =
+ BigtableDataSettings.newBuilderForEmulator(
+ hostAndPort.substring(0, lastIndexOfCol), port);
+ } catch (NumberFormatException | IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Invalid host/port in BigtableConfig " +
hostAndPort);
+ }
+ } else {
+ dataBuilder = BigtableDataSettings.newBuilder();
+ }
+
+ // Configure target
+ dataBuilder
+ .setProjectId(Objects.requireNonNull(config.getProjectId().get()))
+ .setInstanceId(Objects.requireNonNull(config.getInstanceId().get()));
+ if (config.getAppProfileId() != null
+ && !Strings.isNullOrEmpty(config.getAppProfileId().get())) {
+
dataBuilder.setAppProfileId(Objects.requireNonNull(config.getAppProfileId().get()));
+ }
+
+ if (config.getCredentialFactory() != null) {
+ dataBuilder
+ .stubSettings()
+ .setCredentialsProvider(
+ FixedCredentialsProvider.create(
+ ((GcpOptions)
config.getCredentialFactory()).getGcpCredential()));
+ }
+
+ configureHeaderProvider(dataBuilder.stubSettings(), pipelineOptions);
+
+ return dataBuilder;
+ }
+
+ private static void configureHeaderProvider(
+ StubSettings.Builder<?, ?> stubSettings, PipelineOptions
pipelineOptions) {
+
+ ImmutableMap.Builder<String, String> headersBuilder =
ImmutableMap.<String, String>builder();
+ headersBuilder.putAll(stubSettings.getHeaderProvider().getHeaders());
Review Comment:
What headers are you trying to preserve? this should be empty?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -521,28 +563,72 @@ public Read withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
- *
- * <p>Does not modify this object.
*/
@VisibleForTesting
- Read withBigtableService(BigtableService bigtableService) {
+ public Read withEmulator(String emulatorHost) {
BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
}
/**
- * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
+ * Configures the attempt timeout in milliseconds of the reads.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
*/
- @VisibleForTesting
- public Read withEmulator(String emulatorHost) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ public Read withAttemptTimeout(long timeoutMs) {
Review Comment:
should this be a java.time.Duration?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -521,28 +563,72 @@ public Read withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
- *
- * <p>Does not modify this object.
*/
@VisibleForTesting
- Read withBigtableService(BigtableService bigtableService) {
+ public Read withEmulator(String emulatorHost) {
BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
}
/**
- * Returns a new {@link BigtableIO.Read} that will use an official
Bigtable emulator.
+ * Configures the attempt timeout in milliseconds of the reads.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
*/
- @VisibleForTesting
- public Read withEmulator(String emulatorHost) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ public Read withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the operation timeout in milliseconds of the reads.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+
.setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the initial retry delay in milliseconds.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withRetryInitialDelay(long initialDelayMs) {
+ checkArgument(initialDelayMs > 0, "initial delay must be positive");
+ BigtableReadOptions readOptions = getBigtableReadOptions();
+ return toBuilder()
+ .setBigtableReadOptions(
+
readOptions.toBuilder().setRetryInitialDelay(initialDelayMs).build())
+ .build();
+ }
+
+ /**
+ * Configures the delay multiplier.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withRetryDelayMultiplier(double multiplier) {
Review Comment:
Do we need to expose the backoff settings?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
Review Comment:
This class doesnt. only focus on BigtableOptions anymore. It job is now to
take configuration set on the Write & Read PTranforms and convert it into
BigtableDataSettings.
It also has backwards compatibliity for BIgtableOptions and you should
specify which options we honor
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigTranslator {
+
+ /** Translate BigtableConfig and BigtableReadOptions to Veneer settings. */
+ static BigtableDataSettings translateReadToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableReadOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureReadSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateWriteToVeneerSettings(
+ @Nonnull BigtableConfig config,
+ @Nonnull BigtableWriteOptions options,
+ @Nonnull PipelineOptions pipelineOptions) {
+
+ BigtableDataSettings.Builder settings = buildBigtableDataSettings(config,
pipelineOptions);
+ return configureWriteSettings(settings, options);
+ }
+
+ /** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
+ static BigtableDataSettings translateToVeneerSettings(
+ @Nonnull BigtableConfig config, @Nonnull PipelineOptions
pipelineOptions) {
+
+ return buildBigtableDataSettings(config, pipelineOptions).build();
+ }
+
+ private static BigtableDataSettings.Builder buildBigtableDataSettings(
+ BigtableConfig config, PipelineOptions pipelineOptions) {
+ BigtableDataSettings.Builder dataBuilder;
+ if (!Strings.isNullOrEmpty(config.getEmulatorHost())) {
+ String hostAndPort = config.getEmulatorHost();
+ try {
+ int lastIndexOfCol = hostAndPort.lastIndexOf(":");
+ int port = Integer.parseInt(hostAndPort.substring(lastIndexOfCol + 1));
+ dataBuilder =
+ BigtableDataSettings.newBuilderForEmulator(
+ hostAndPort.substring(0, lastIndexOfCol), port);
+ } catch (NumberFormatException | IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Invalid host/port in BigtableConfig " +
hostAndPort);
+ }
+ } else {
+ dataBuilder = BigtableDataSettings.newBuilder();
+ }
+
+ // Configure target
+ dataBuilder
+ .setProjectId(Objects.requireNonNull(config.getProjectId().get()))
+ .setInstanceId(Objects.requireNonNull(config.getInstanceId().get()));
+ if (config.getAppProfileId() != null
+ && !Strings.isNullOrEmpty(config.getAppProfileId().get())) {
+
dataBuilder.setAppProfileId(Objects.requireNonNull(config.getAppProfileId().get()));
+ }
+
+ if (config.getCredentialFactory() != null) {
+ dataBuilder
+ .stubSettings()
+ .setCredentialsProvider(
+ FixedCredentialsProvider.create(
+ ((GcpOptions)
config.getCredentialFactory()).getGcpCredential()));
+ }
+
+ configureHeaderProvider(dataBuilder.stubSettings(), pipelineOptions);
+
+ return dataBuilder;
+ }
+
+ private static void configureHeaderProvider(
+ StubSettings.Builder<?, ?> stubSettings, PipelineOptions
pipelineOptions) {
+
+ ImmutableMap.Builder<String, String> headersBuilder =
ImmutableMap.<String, String>builder();
+ headersBuilder.putAll(stubSettings.getHeaderProvider().getHeaders());
+ headersBuilder.put(
+ GrpcUtil.USER_AGENT_KEY.name(),
Objects.requireNonNull(pipelineOptions.getUserAgent()));
+
+
stubSettings.setHeaderProvider(FixedHeaderProvider.create(headersBuilder.build()));
+ }
+
+ private static BigtableDataSettings configureWriteSettings(
+ BigtableDataSettings.Builder settings, BigtableWriteOptions
writeOptions) {
+ BigtableBatchingCallSettings.Builder callSettings =
+ settings.stubSettings().bulkMutateRowsSettings();
+ RetrySettings.Builder retrySettings =
callSettings.getRetrySettings().toBuilder();
+ BatchingSettings.Builder batchingSettings =
callSettings.getBatchingSettings().toBuilder();
+ if (writeOptions.getAttemptTimeout() != null) {
+
retrySettings.setInitialRpcTimeout(Duration.ofMillis(writeOptions.getAttemptTimeout()));
+
+ if (writeOptions.getOperationTimeout() == null) {
+ retrySettings.setTotalTimeout(
+ Duration.ofMillis(
+ Math.max(
+ retrySettings.getTotalTimeout().toMillis(),
writeOptions.getAttemptTimeout())));
Review Comment:
Do you need the if statement here? The next statement will clobber it, so I
think you can skip the if here
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1346,29 +1624,33 @@ public List<ByteKeyRange> getRanges() {
}
public ValueProvider<String> getTableId() {
- return config.getTableId();
+ return readOptions.getTableId();
}
}
private static class BigtableReader extends BoundedReader<Row> {
// Thread-safety: source is protected via synchronization and is only
accessed or modified
// inside a synchronized block (or constructor, which is the same).
private BigtableSource source;
- private BigtableService service;
+
+ private final BigtableServiceFactory factory;
+ private BigtableServiceEntry serviceEntry;
Review Comment:
Nullable
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -656,4 +695,98 @@ public int compareTo(@Nonnull EndPoint o) {
.result();
}
}
+
+ static class BigtableRowProtoAdapter implements
RowAdapter<com.google.bigtable.v2.Row> {
+ @Override
+ public RowBuilder<com.google.bigtable.v2.Row> createRowBuilder() {
+ return new DefaultRowBuilder();
+ }
+
+ @Override
+ public boolean isScanMarkerRow(com.google.bigtable.v2.Row row) {
+ return Objects.equals(row,
com.google.bigtable.v2.Row.getDefaultInstance());
+ }
+
+ @Override
+ public ByteString getKey(com.google.bigtable.v2.Row row) {
+ return row.getKey();
+ }
+
+ private static class DefaultRowBuilder
+ implements RowAdapter.RowBuilder<com.google.bigtable.v2.Row> {
+ private com.google.bigtable.v2.Row.Builder protoBuilder =
+ com.google.bigtable.v2.Row.newBuilder();
+
+ private ByteString currentValue;
+ private Family.Builder lastFamily;
+ private String lastFamilyName;
+ private Column.Builder lastColumn;
+ private ByteString lastColumnName;
+
+ private Cell.Builder lastCell;
Review Comment:
Nullable
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java:
##########
@@ -30,6 +29,7 @@
import java.util.concurrent.CompletionStage;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
+import org.threeten.bp.Duration;
Review Comment:
I think we want to use java.time for beam...threetenbp is a legacy thing in
veneer
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory class that caches {@link BigtableService} to share between workers
with the same {@link
+ * BigtableConfig} and read / write options.
Review Comment:
Please add some notes about how configIds are used
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -764,35 +894,119 @@ public Write withoutValidation() {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write using the given
Cloud Bigtable service
- * implementation.
+ * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
*
* <p>This is used for testing.
+ */
+ @VisibleForTesting
+ public Write withEmulator(String emulatorHost) {
+ BigtableConfig config = getBigtableConfig();
+ return
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the attempt timeout in
milliseconds for writes.
*
* <p>Does not modify this object.
*/
- Write withBigtableService(BigtableService bigtableService) {
- BigtableConfig config = getBigtableConfig();
- return
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+ public Write withAttemptTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeoutMs).build())
+ .build();
}
/**
- * Returns a new {@link BigtableIO.Write} that will use an official
Bigtable emulator.
+ * Returns a new {@link BigtableIO.Write} with the operation timeout in
milliseconds for writes.
*
- * <p>This is used for testing.
+ * <p>Does not modify this object.
+ */
+ public Write withOperationTimeout(long timeoutMs) {
+ checkArgument(timeoutMs > 0, "operation timeout must be positive");
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeoutMs).build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with the retry delay in
milliseconds.
Review Comment:
similar for all of the other new options
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -829,56 +1043,93 @@ public final String toString() {
PCollection<KV<ByteString, Iterable<Mutation>>>,
PCollection<BigtableWriteResult>> {
private final BigtableConfig bigtableConfig;
+ private final BigtableWriteOptions bigtableWriteOptions;
- WriteWithResults(BigtableConfig bigtableConfig) {
+ private final BigtableServiceFactory factory;
+
+ WriteWithResults(
+ BigtableConfig bigtableConfig,
+ BigtableWriteOptions bigtableWriteOptions,
+ BigtableServiceFactory factory) {
this.bigtableConfig = bigtableConfig;
+ this.bigtableWriteOptions = bigtableWriteOptions;
+ this.factory = factory;
}
@Override
public PCollection<BigtableWriteResult> expand(
PCollection<KV<ByteString, Iterable<Mutation>>> input) {
bigtableConfig.validate();
+ bigtableWriteOptions.validate();
- return input.apply(ParDo.of(new BigtableWriterFn(bigtableConfig)));
+ return input.apply(
+ ParDo.of(new BigtableWriterFn(factory, bigtableConfig,
bigtableWriteOptions)));
}
@Override
public void validate(PipelineOptions options) {
- validateTableExists(bigtableConfig, options);
+ validateTableExists(bigtableConfig, bigtableWriteOptions, options);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
bigtableConfig.populateDisplayData(builder);
+ bigtableWriteOptions.populateDisplayData(builder);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(WriteWithResults.class)
.add("config", bigtableConfig)
+ .add("writeOptions", bigtableWriteOptions)
.toString();
}
+
+ private void validateTableExists(
+ BigtableConfig config, BigtableWriteOptions writeOptions,
PipelineOptions options) {
+ if (config.getValidate() && config.isDataAccessible() &&
writeOptions.isDataAccessible()) {
+ String tableId = checkNotNull(writeOptions.getTableId().get());
+ try {
+ checkArgument(
+ factory.checkTableExists(config, options,
writeOptions.getTableId().get()),
+ "Table %s does not exist",
+ tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.",
tableId, e);
+ }
+ }
+ }
}
private static class BigtableWriterFn
extends DoFn<KV<ByteString, Iterable<Mutation>>, BigtableWriteResult> {
- BigtableWriterFn(BigtableConfig bigtableConfig) {
+ private BigtableServiceFactory factory;
+ private BigtableServiceFactory.ConfigId id;
+ private BigtableServiceEntry serviceEntry;
Review Comment:
Nullable with a comment that it will be cleared on teardown
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]