igorbernstein2 commented on code in PR #24015: URL: https://github.com/apache/beam/pull/24015#discussion_r1093422568
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable; + +import com.google.api.core.ApiFunction; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StubSettings; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; +import com.google.cloud.bigtable.Version; +import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.config.CredentialOptions; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings; +import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Deadline; +import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.initialization.qual.UnderInitialization; +import org.threeten.bp.Duration; + +/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer settings. */ +class BigtableHBaseVeneeringSettings { + private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT = + "batch-bigtable.googleapis.com:443"; + private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = Duration.ofSeconds(20); + private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = Duration.ofMinutes(6); + + private final BigtableDataSettings dataSettings; + private final BigtableTableAdminSettings tableAdminSettings; + private final BigtableInstanceAdminSettings instanceAdminSettings; + + private final BigtableIOOperationTimeouts clientTimeouts; + + static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions options) + throws IOException { + return new BigtableHBaseVeneeringSettings(options); + } + + private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) throws IOException { + // Build configs for veneer + this.clientTimeouts = buildCallSettings(options); + + this.dataSettings = buildBigtableDataSettings(clientTimeouts, options); + this.tableAdminSettings = buildBigtableTableAdminSettings(options); + this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options); + } + + // ************** Getters ************** + /** Utility to convert {@link BigtableOptions} to {@link BigtableDataSettings}. */ + BigtableDataSettings getDataSettings() { + return dataSettings; + } + + /** Utility to convert {@link BigtableOptions} to {@link BigtableTableAdminSettings}. */ + BigtableTableAdminSettings getTableAdminSettings() { + return tableAdminSettings; + } + + BigtableIOOperationTimeouts getOperationTimeouts() { + return clientTimeouts; + } + + /** Utility to convert {@link BigtableOptions} to {@link BigtableInstanceAdminSettings}. */ + BigtableInstanceAdminSettings getInstanceAdminSettings() { + return instanceAdminSettings; + } + + // ************** Private Helpers ************** + private BigtableDataSettings buildBigtableDataSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, + BigtableIOOperationTimeouts clientTimeouts, + BigtableOptions options) + throws IOException { + BigtableDataSettings.Builder dataBuilder; + + // Configure the Data connection + dataBuilder = BigtableDataSettings.newBuilder(); + if (options.useBatch()) { + configureConnection( + dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, options); + } else { + configureConnection( + dataBuilder.stubSettings(), options.getDataHost() + ":" + options.getPort(), options); + } + configureCredentialProvider(dataBuilder.stubSettings(), options); + configureHeaderProvider(dataBuilder.stubSettings(), options); + + // Configure the target + dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId()); + if (options.getAppProfileId() != null) { + dataBuilder.setAppProfileId(options.getAppProfileId()); + } + + // Configure RPCs - this happens in multiple parts: + // - retry settings are configured here + // - timeouts are split into multiple places: + // - timeouts for retries are configured here + // - if USE_TIMEOUTS is explicitly disabled, then an interceptor is added to force all + // deadlines to 6 minutes + configureConnectionCallTimeouts(dataBuilder.stubSettings(), clientTimeouts); + + // Complex RPC method settings + configureBulkMutationSettings( + dataBuilder.stubSettings().bulkMutateRowsSettings(), + clientTimeouts.getBulkMutateTimeouts(), + options); + configureBulkReadRowsSettings( + dataBuilder.stubSettings().bulkReadRowsSettings(), + clientTimeouts.getBulkReadRowsTimeouts(), + options); + configureReadRowsSettings( + dataBuilder.stubSettings().readRowsSettings(), + clientTimeouts.getBulkReadRowsTimeouts(), + options); + + // RPC methods - simple + configureNonRetryableCallSettings( + dataBuilder.stubSettings().checkAndMutateRowSettings(), clientTimeouts.getUnaryTimeouts()); + configureNonRetryableCallSettings( + dataBuilder.stubSettings().readModifyWriteRowSettings(), clientTimeouts.getUnaryTimeouts()); + + configureRetryableCallSettings( + dataBuilder.stubSettings().mutateRowSettings(), clientTimeouts.getUnaryTimeouts(), options); + configureRetryableCallSettings( + dataBuilder.stubSettings().readRowSettings(), clientTimeouts.getUnaryTimeouts(), options); + configureRetryableCallSettings( + dataBuilder.stubSettings().sampleRowKeysSettings(), + clientTimeouts.getUnaryTimeouts(), + options); + + return dataBuilder.build(); + } + + private BigtableTableAdminSettings buildBigtableTableAdminSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, BigtableOptions options) + throws IOException { + BigtableTableAdminSettings.Builder adminBuilder; + + // Configure connection + adminBuilder = BigtableTableAdminSettings.newBuilder(); + configureConnection( + adminBuilder.stubSettings(), options.getAdminHost() + ":" + options.getPort(), options); + configureCredentialProvider(adminBuilder.stubSettings(), options); + + configureHeaderProvider(adminBuilder.stubSettings(), options); + + adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId()); + + // timeout/retry settings don't apply to admin operations + // v1 used to use RetryOptions for: + // - createTable + // - getTable + // - listTables + // - deleteTable + // - modifyColumnFamilies + // - dropRowRange + // However data latencies are very different from data latencies and end users shouldn't need to + // change the defaults + // if it turns out that the timeout & retry behavior needs to be configurable, we will expose + // separate settings + + return adminBuilder.build(); + } + + private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, BigtableOptions options) + throws IOException { + BigtableInstanceAdminSettings.Builder adminBuilder; + + // Configure connection + adminBuilder = BigtableInstanceAdminSettings.newBuilder(); + configureConnection( + adminBuilder.stubSettings(), options.getAdminHost() + ":" + options.getPort(), options); + configureCredentialProvider(adminBuilder.stubSettings(), options); + + configureHeaderProvider(adminBuilder.stubSettings(), options); + + adminBuilder.setProjectId(options.getProjectId()); + + return adminBuilder.build(); + } Review Comment: I dont think we care about the instance admin api? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable; + +import com.google.api.core.ApiFunction; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StubSettings; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; +import com.google.cloud.bigtable.Version; +import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.config.CredentialOptions; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings; +import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Deadline; +import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.initialization.qual.UnderInitialization; +import org.threeten.bp.Duration; + +/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer settings. */ +class BigtableHBaseVeneeringSettings { + private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT = + "batch-bigtable.googleapis.com:443"; + private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = Duration.ofSeconds(20); + private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = Duration.ofMinutes(6); + + private final BigtableDataSettings dataSettings; + private final BigtableTableAdminSettings tableAdminSettings; + private final BigtableInstanceAdminSettings instanceAdminSettings; + + private final BigtableIOOperationTimeouts clientTimeouts; + + static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions options) + throws IOException { + return new BigtableHBaseVeneeringSettings(options); + } + + private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) throws IOException { + // Build configs for veneer + this.clientTimeouts = buildCallSettings(options); + + this.dataSettings = buildBigtableDataSettings(clientTimeouts, options); + this.tableAdminSettings = buildBigtableTableAdminSettings(options); + this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options); + } + + // ************** Getters ************** + /** Utility to convert {@link BigtableOptions} to {@link BigtableDataSettings}. */ + BigtableDataSettings getDataSettings() { + return dataSettings; + } + + /** Utility to convert {@link BigtableOptions} to {@link BigtableTableAdminSettings}. */ + BigtableTableAdminSettings getTableAdminSettings() { + return tableAdminSettings; + } + + BigtableIOOperationTimeouts getOperationTimeouts() { + return clientTimeouts; + } + + /** Utility to convert {@link BigtableOptions} to {@link BigtableInstanceAdminSettings}. */ + BigtableInstanceAdminSettings getInstanceAdminSettings() { + return instanceAdminSettings; + } + + // ************** Private Helpers ************** + private BigtableDataSettings buildBigtableDataSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, + BigtableIOOperationTimeouts clientTimeouts, + BigtableOptions options) + throws IOException { + BigtableDataSettings.Builder dataBuilder; + + // Configure the Data connection + dataBuilder = BigtableDataSettings.newBuilder(); + if (options.useBatch()) { + configureConnection( + dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, options); + } else { + configureConnection( + dataBuilder.stubSettings(), options.getDataHost() + ":" + options.getPort(), options); + } + configureCredentialProvider(dataBuilder.stubSettings(), options); + configureHeaderProvider(dataBuilder.stubSettings(), options); + + // Configure the target + dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId()); + if (options.getAppProfileId() != null) { + dataBuilder.setAppProfileId(options.getAppProfileId()); + } + + // Configure RPCs - this happens in multiple parts: + // - retry settings are configured here + // - timeouts are split into multiple places: + // - timeouts for retries are configured here + // - if USE_TIMEOUTS is explicitly disabled, then an interceptor is added to force all + // deadlines to 6 minutes + configureConnectionCallTimeouts(dataBuilder.stubSettings(), clientTimeouts); + + // Complex RPC method settings + configureBulkMutationSettings( + dataBuilder.stubSettings().bulkMutateRowsSettings(), + clientTimeouts.getBulkMutateTimeouts(), + options); + configureBulkReadRowsSettings( + dataBuilder.stubSettings().bulkReadRowsSettings(), + clientTimeouts.getBulkReadRowsTimeouts(), + options); + configureReadRowsSettings( + dataBuilder.stubSettings().readRowsSettings(), + clientTimeouts.getBulkReadRowsTimeouts(), + options); + + // RPC methods - simple + configureNonRetryableCallSettings( + dataBuilder.stubSettings().checkAndMutateRowSettings(), clientTimeouts.getUnaryTimeouts()); + configureNonRetryableCallSettings( + dataBuilder.stubSettings().readModifyWriteRowSettings(), clientTimeouts.getUnaryTimeouts()); + + configureRetryableCallSettings( + dataBuilder.stubSettings().mutateRowSettings(), clientTimeouts.getUnaryTimeouts(), options); + configureRetryableCallSettings( + dataBuilder.stubSettings().readRowSettings(), clientTimeouts.getUnaryTimeouts(), options); + configureRetryableCallSettings( + dataBuilder.stubSettings().sampleRowKeysSettings(), + clientTimeouts.getUnaryTimeouts(), + options); + + return dataBuilder.build(); + } + + private BigtableTableAdminSettings buildBigtableTableAdminSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, BigtableOptions options) + throws IOException { + BigtableTableAdminSettings.Builder adminBuilder; + + // Configure connection + adminBuilder = BigtableTableAdminSettings.newBuilder(); + configureConnection( + adminBuilder.stubSettings(), options.getAdminHost() + ":" + options.getPort(), options); + configureCredentialProvider(adminBuilder.stubSettings(), options); + + configureHeaderProvider(adminBuilder.stubSettings(), options); + + adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId()); + + // timeout/retry settings don't apply to admin operations + // v1 used to use RetryOptions for: + // - createTable + // - getTable + // - listTables + // - deleteTable + // - modifyColumnFamilies + // - dropRowRange + // However data latencies are very different from data latencies and end users shouldn't need to + // change the defaults + // if it turns out that the timeout & retry behavior needs to be configurable, we will expose + // separate settings + + return adminBuilder.build(); + } + + private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings( + @UnderInitialization BigtableHBaseVeneeringSettings this, BigtableOptions options) + throws IOException { + BigtableInstanceAdminSettings.Builder adminBuilder; + + // Configure connection + adminBuilder = BigtableInstanceAdminSettings.newBuilder(); + configureConnection( + adminBuilder.stubSettings(), options.getAdminHost() + ":" + options.getPort(), options); + configureCredentialProvider(adminBuilder.stubSettings(), options); + + configureHeaderProvider(adminBuilder.stubSettings(), options); + + adminBuilder.setProjectId(options.getProjectId()); + + return adminBuilder.build(); + } + + @SuppressWarnings("rawtypes") + private void configureConnection( + @UnderInitialization BigtableHBaseVeneeringSettings this, + StubSettings.Builder<?, ?> stubSettings, + String endpoint, + BigtableOptions options) { + final InstantiatingGrpcChannelProvider.Builder channelProvider = + ((InstantiatingGrpcChannelProvider) stubSettings.getTransportChannelProvider()).toBuilder(); + + stubSettings.setEndpoint(endpoint); + + if (options.usePlaintextNegotiation()) { + // Make sure to avoid clobbering the old Configurator + @SuppressWarnings("rawtypes") + final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> prevConfigurator = + channelProvider.getChannelConfigurator(); + //noinspection rawtypes + channelProvider.setChannelConfigurator( + new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) { + if (prevConfigurator != null) { + channelBuilder = prevConfigurator.apply(channelBuilder); + } + return channelBuilder.usePlaintext(); + } + }); + } + + channelProvider.setPoolSize(options.getChannelCount()); + + stubSettings.setTransportChannelProvider(channelProvider.build()); + } + + private void configureHeaderProvider( + @UnderInitialization BigtableHBaseVeneeringSettings this, + StubSettings.Builder<?, ?> stubSettings, + BigtableOptions options) { + + ImmutableMap.Builder<String, String> headersBuilder = ImmutableMap.<String, String>builder(); + List<String> userAgentParts = Lists.newArrayList(); + userAgentParts.add("bigtable-" + Version.VERSION); + userAgentParts.add("jdk-" + System.getProperty("java.specification.version")); Review Comment: Do we need to mess with the header in this connector? I can see adding beam into the user agent, but the rest of it seems a bit extra -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
