igorbernstein2 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1093429731


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().sampleRowKeysSettings(),
+        clientTimeouts.getUnaryTimeouts(),
+        options);
+
+    return dataBuilder.build();
+  }
+
+  private BigtableTableAdminSettings buildBigtableTableAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableTableAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableTableAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    
adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+
+    // timeout/retry settings don't apply to admin operations
+    // v1 used to use RetryOptions for:
+    // - createTable
+    // - getTable
+    // - listTables
+    // - deleteTable
+    // - modifyColumnFamilies
+    // - dropRowRange
+    // However data latencies are very different from data latencies and end 
users shouldn't need to
+    // change the defaults
+    // if it turns out that the timeout & retry behavior needs to be 
configurable, we will expose
+    // separate settings
+
+    return adminBuilder.build();
+  }
+
+  private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableInstanceAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableInstanceAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    adminBuilder.setProjectId(options.getProjectId());
+
+    return adminBuilder.build();
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void configureConnection(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      String endpoint,
+      BigtableOptions options) {
+    final InstantiatingGrpcChannelProvider.Builder channelProvider =
+        ((InstantiatingGrpcChannelProvider) 
stubSettings.getTransportChannelProvider()).toBuilder();
+
+    stubSettings.setEndpoint(endpoint);
+
+    if (options.usePlaintextNegotiation()) {
+      // Make sure to avoid clobbering the old Configurator
+      @SuppressWarnings("rawtypes")
+      final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> 
prevConfigurator =
+          channelProvider.getChannelConfigurator();
+      //noinspection rawtypes
+      channelProvider.setChannelConfigurator(
+          new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
+            @Override
+            public ManagedChannelBuilder apply(ManagedChannelBuilder 
channelBuilder) {
+              if (prevConfigurator != null) {
+                channelBuilder = prevConfigurator.apply(channelBuilder);
+              }
+              return channelBuilder.usePlaintext();
+            }
+          });
+    }
+
+    channelProvider.setPoolSize(options.getChannelCount());
+
+    stubSettings.setTransportChannelProvider(channelProvider.build());
+  }
+
+  private void configureHeaderProvider(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      BigtableOptions options) {
+
+    ImmutableMap.Builder<String, String> headersBuilder = 
ImmutableMap.<String, String>builder();
+    List<String> userAgentParts = Lists.newArrayList();
+    userAgentParts.add("bigtable-" + Version.VERSION);
+    userAgentParts.add("jdk-" + 
System.getProperty("java.specification.version"));
+
+    String customUserAgent = options.getUserAgent();
+    if (customUserAgent != null) {
+      userAgentParts.add(customUserAgent);
+    }
+
+    String userAgent = Joiner.on(",").join(userAgentParts);
+    headersBuilder.put(GrpcUtil.USER_AGENT_KEY.name(), userAgent);
+
+    String tracingCookie = options.getTracingCookie();
+    if (tracingCookie != null) {
+      headersBuilder.put("cookie", tracingCookie);
+    }

Review Comment:
   we can probably drop this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to