This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dd5bbb2d77d Add a read timeout and cache BigQueryIOMetadata (#29662)
dd5bbb2d77d is described below
commit dd5bbb2d77d7f3d057baad3d872677671c730345
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Jan 16 09:56:10 2024 +0100
Add a read timeout and cache BigQueryIOMetadata (#29662)
---
.../sdk/extensions/gcp/util/GceMetadataUtil.java | 1 +
.../sdk/io/gcp/bigquery/BigQueryIOMetadata.java | 38 ++++++++++++----------
2 files changed, 21 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
index fd49b759fd6..e63aa7dc677 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -44,6 +44,7 @@ public class GceMetadataUtil {
int timeoutMillis = 5000;
final HttpParams httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
+ HttpConnectionParams.setSoTimeout(httpParams, timeoutMillis);
String ret = "";
try {
HttpClient client = new DefaultHttpClient(httpParams);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
index 9cce436fe35..f8d261d3bf6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
@@ -18,19 +18,25 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
final class BigQueryIOMetadata {
- private @Nullable String beamJobId;
+ private final @Nullable String beamJobId;
- private @Nullable String beamJobName;
+ private final @Nullable String beamJobName;
- private @Nullable String beamWorkerId;
+ private final @Nullable String beamWorkerId;
+
+ static final Supplier<BigQueryIOMetadata> INSTANCE =
+ Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5,
TimeUnit.MINUTES);
private BigQueryIOMetadata(
@Nullable String beamJobId, @Nullable String beamJobName, @Nullable
String beamWorkerId) {
@@ -47,25 +53,21 @@ final class BigQueryIOMetadata {
* being used.
*/
public static BigQueryIOMetadata create() {
- String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
- String dataflowJobName = GceMetadataUtil.fetchDataflowJobName();
- String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId();
+ return INSTANCE.get();
+ }
+ private static BigQueryIOMetadata refreshInstance() {
+ String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
// If a Dataflow job id is returned on GCE metadata. Then it means
// this program is running on a Dataflow GCE VM.
- boolean isDataflowRunner = !dataflowJobId.isEmpty();
-
- String beamJobId = null;
- String beamJobName = null;
- String beamWorkerId = null;
- if (isDataflowRunner) {
- if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
- beamJobId = dataflowJobId;
- beamJobName = dataflowJobName;
- beamWorkerId = dataflowWorkerId;
- }
+ if (dataflowJobId.isEmpty() ||
!BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
+ return new BigQueryIOMetadata(null, null, null);
}
- return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId);
+
+ return new BigQueryIOMetadata(
+ dataflowJobId,
+ GceMetadataUtil.fetchDataflowJobName(),
+ GceMetadataUtil.fetchDataflowWorkerId());
}
public Map<String, String> addAdditionalJobLabels(Map<String, String>
jobLabels) {