igorbernstein2 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1097543625
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
return getBigtableService();
}
- BigtableOptions.Builder bigtableOptions =
effectiveUserProvidedBigtableOptions();
+ BigtableConfig.Builder config = toBuilder();
- bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+ if (pipelineOptions instanceof GcpOptions) {
+ config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+ }
- if (bigtableOptions.build().getCredentialOptions().getCredentialType()
- == CredentialOptions.CredentialType.DefaultCredentials) {
- bigtableOptions.setCredentialOptions(
-
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+ try {
+ translateBigtableOptions(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return new BigtableServiceImpl(bigtableOptions.build());
+ config.setUserAgent(pipelineOptions.getUserAgent());
+
+ return new BigtableServiceImpl(config.build());
}
boolean isDataAccessible() {
- return getTableId().isAccessible()
- && (getProjectId() == null || getProjectId().isAccessible())
+ return (getProjectId() == null || getProjectId().isAccessible())
&& (getInstanceId() == null || getInstanceId().isAccessible());
}
- private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
- BigtableOptions.Builder effectiveOptions =
- getBigtableOptions() != null
- ? getBigtableOptions().toBuilder()
- : new BigtableOptions.Builder();
+ private void translateBigtableOptions(BigtableConfig.Builder builder) throws
IOException {
+ BigtableOptions.Builder effectiveOptionsBuilder = null;
+
+ if (getBigtableOptions() != null) {
+ effectiveOptionsBuilder = getBigtableOptions().toBuilder();
+ }
if (getBigtableOptionsConfigurator() != null) {
- effectiveOptions =
getBigtableOptionsConfigurator().apply(effectiveOptions);
+ effectiveOptionsBuilder =
getBigtableOptionsConfigurator().apply(BigtableOptions.builder());
}
- // Default option that should be forced in most cases
- effectiveOptions.setUseCachedDataPool(true);
+ if (effectiveOptionsBuilder == null) {
+ return;
+ }
+
+ BigtableOptions effectiveOptions = effectiveOptionsBuilder.build();
- if (getInstanceId() != null) {
- effectiveOptions.setInstanceId(getInstanceId().get());
+ // Todo decided if we should implement cached channel pool
+
+ if (effectiveOptions.getInstanceId() != null && getInstanceId() == null) {
+
builder.setInstanceId(ValueProvider.StaticValueProvider.of(effectiveOptions.getInstanceId()));
}
- if (getProjectId() != null) {
- effectiveOptions.setProjectId(getProjectId().get());
+ if (effectiveOptions.getProjectId() != null && getProjectId() == null) {
+
builder.setProjectId(ValueProvider.StaticValueProvider.of(effectiveOptions.getProjectId()));
}
- if (getEmulatorHost() != null) {
- effectiveOptions.enableEmulator(getEmulatorHost());
- effectiveOptions.setUseCachedDataPool(false);
+ if (!effectiveOptions.getDataHost().equals("bigtable.googleapis.com")
+ && getEmulatorHost() == null) {
+ builder.setEmulatorHost(
+ String.format("%s:%s", effectiveOptions.getDataHost(),
effectiveOptions.getPort()));
Review Comment:
I think looks strange. If someone set the endpoint to
batch-bigtable.googleapis.com or to a test endpoint, then we assume its an
emulator?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions
pipelineOptions) {
return getBigtableService();
}
- BigtableOptions.Builder bigtableOptions =
effectiveUserProvidedBigtableOptions();
+ BigtableConfig.Builder config = toBuilder();
- bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+ if (pipelineOptions instanceof GcpOptions) {
+ config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+ }
- if (bigtableOptions.build().getCredentialOptions().getCredentialType()
- == CredentialOptions.CredentialType.DefaultCredentials) {
- bigtableOptions.setCredentialOptions(
-
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+ try {
+ translateBigtableOptions(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return new BigtableServiceImpl(bigtableOptions.build());
+ config.setUserAgent(pipelineOptions.getUserAgent());
+
+ return new BigtableServiceImpl(config.build());
}
boolean isDataAccessible() {
- return getTableId().isAccessible()
- && (getProjectId() == null || getProjectId().isAccessible())
+ return (getProjectId() == null || getProjectId().isAccessible())
&& (getInstanceId() == null || getInstanceId().isAccessible());
}
- private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
- BigtableOptions.Builder effectiveOptions =
- getBigtableOptions() != null
- ? getBigtableOptions().toBuilder()
- : new BigtableOptions.Builder();
+ private void translateBigtableOptions(BigtableConfig.Builder builder) throws
IOException {
+ BigtableOptions.Builder effectiveOptionsBuilder = null;
+
+ if (getBigtableOptions() != null) {
+ effectiveOptionsBuilder = getBigtableOptions().toBuilder();
+ }
if (getBigtableOptionsConfigurator() != null) {
- effectiveOptions =
getBigtableOptionsConfigurator().apply(effectiveOptions);
+ effectiveOptionsBuilder =
getBigtableOptionsConfigurator().apply(BigtableOptions.builder());
}
- // Default option that should be forced in most cases
- effectiveOptions.setUseCachedDataPool(true);
+ if (effectiveOptionsBuilder == null) {
+ return;
+ }
+
+ BigtableOptions effectiveOptions = effectiveOptionsBuilder.build();
- if (getInstanceId() != null) {
- effectiveOptions.setInstanceId(getInstanceId().get());
+ // Todo decided if we should implement cached channel pool
+
+ if (effectiveOptions.getInstanceId() != null && getInstanceId() == null) {
+
builder.setInstanceId(ValueProvider.StaticValueProvider.of(effectiveOptions.getInstanceId()));
}
- if (getProjectId() != null) {
- effectiveOptions.setProjectId(getProjectId().get());
+ if (effectiveOptions.getProjectId() != null && getProjectId() == null) {
+
builder.setProjectId(ValueProvider.StaticValueProvider.of(effectiveOptions.getProjectId()));
}
- if (getEmulatorHost() != null) {
- effectiveOptions.enableEmulator(getEmulatorHost());
- effectiveOptions.setUseCachedDataPool(false);
+ if (!effectiveOptions.getDataHost().equals("bigtable.googleapis.com")
+ && getEmulatorHost() == null) {
+ builder.setEmulatorHost(
+ String.format("%s:%s", effectiveOptions.getDataHost(),
effectiveOptions.getPort()));
}
- return effectiveOptions;
+ if (effectiveOptions.getCredentialOptions() != null) {
+ CredentialOptions credOptions = effectiveOptions.getCredentialOptions();
+ switch (credOptions.getCredentialType()) {
+ case DefaultCredentials:
+ GoogleCredentials credentials =
GoogleCredentials.getApplicationDefault();
+ builder.setCredentials(credentials);
+ break;
+ case P12:
+ String keyFile = ((CredentialOptions.P12CredentialOptions)
credOptions).getKeyFile();
+ String serviceAccount =
+ ((CredentialOptions.P12CredentialOptions)
credOptions).getServiceAccount();
+ try {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ try (FileInputStream fin = new FileInputStream(keyFile)) {
+ keyStore.load(fin, "notasecret".toCharArray());
+ }
+ PrivateKey privateKey =
+ (PrivateKey) keyStore.getKey("privatekey",
"notasecret".toCharArray());
+
+ if (privateKey == null) {
+ throw new IllegalStateException("private key cannot be null");
+ }
+ builder.setCredentials(
+ ServiceAccountJwtAccessCredentials.newBuilder()
+ .setClientEmail(serviceAccount)
+ .setPrivateKey(privateKey)
+ .build());
+ } catch (GeneralSecurityException exception) {
+ throw new RuntimeException("exception while retrieving
credentials", exception);
+ }
+ break;
+ case SuppliedCredentials:
+ builder.setCredentials(
+ ((CredentialOptions.UserSuppliedCredentialOptions)
credOptions).getCredential());
+ break;
+ case SuppliedJson:
+ CredentialOptions.JsonCredentialsOptions jsonCredentialsOptions =
+ (CredentialOptions.JsonCredentialsOptions) credOptions;
+ synchronized (jsonCredentialsOptions) {
Review Comment:
I think we can drop cred caching for BigtableOtions. If anything we should
add it at a higher level
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigToVeneerSettings.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.grpc.ChannelPoolSettings;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.internal.GrpcUtil;
+import java.io.IOException;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer
settings. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigToVeneerSettings {
+ private static final String DEFAULT_DATA_ENDPOINT =
"bigtable.googleapis.com:443";
+ private static final String DEFAULT_ADMIN_ENDPOINT =
"bigtableadmin.googleapis.com:443";
+
+ private final BigtableDataSettings dataSettings;
+ private final BigtableTableAdminSettings tableAdminSettings;
+
+ static BigtableConfigToVeneerSettings create(@Nonnull BigtableConfig config)
throws IOException {
+ return new BigtableConfigToVeneerSettings(config);
+ }
+
+ private BigtableConfigToVeneerSettings(@Nonnull BigtableConfig config)
throws IOException {
+ if (config.getProjectId() == null || config.getInstanceId() == null) {
+ throw new IOException("can't find project or instance id");
+ }
+
+ // Build configs for veneer
+ this.dataSettings = buildBigtableDataSettings(config);
+ this.tableAdminSettings = buildBigtableTableAdminSettings(config);
+ }
+
+ // ************** Getters **************
+ /** Utility to convert {@link BigtableOptions} to {@link
BigtableDataSettings}. */
+ BigtableDataSettings getDataSettings() {
+ return dataSettings;
+ }
+
+ /** Utility to convert {@link BigtableOptions} to {@link
BigtableTableAdminSettings}. */
+ BigtableTableAdminSettings getTableAdminSettings() {
+ return tableAdminSettings;
+ }
+
+ // ************** Private Helpers **************
+ private BigtableDataSettings buildBigtableDataSettings(
+ @UnderInitialization BigtableConfigToVeneerSettings this, BigtableConfig
config)
+ throws IOException {
+ BigtableDataSettings.Builder dataBuilder;
+
+ // Configure the Data connection
+ dataBuilder = BigtableDataSettings.newBuilder();
+ if (config.getEmulatorHost() != null) {
+ configureConnection(
+ dataBuilder.stubSettings(), config,
Objects.requireNonNull(config.getEmulatorHost()));
+ } else {
+ configureConnection(dataBuilder.stubSettings(), config,
DEFAULT_DATA_ENDPOINT);
Review Comment:
dont we want this to be batch-bigtable.googleapis.com?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]