This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 00320aa7793 Merge pull request #17428: [BEAM-14326] Make sure BigQuery
daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
00320aa7793 is described below
commit 00320aa7793ed4322a3fd029864dd300ac8bec00
Author: Reuven Lax <[email protected]>
AuthorDate: Fri Apr 22 10:14:34 2022 -0700
Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread
doesn't exit suddenly, as this leads to pipeline stuckness
---
.../StorageApiDynamicDestinationsTableRow.java | 10 +++++-
.../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 42 ++++++++++++++--------
2 files changed, 36 insertions(+), 16 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 281a7adb529..c48dbe0dedb 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -27,6 +27,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +75,10 @@ public class StorageApiDynamicDestinationsTableRow<T,
DestinationT>
{
tableSchema = getSchema(destination);
+ TableReference tableReference =
getTable(destination).getTableReference();
if (tableSchema == null) {
// If the table already exists, then try and fetch the schema from
the existing
// table.
- TableReference tableReference =
getTable(destination).getTableReference();
tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
if (tableSchema == null) {
if (createDisposition == CreateDisposition.CREATE_NEVER) {
@@ -95,7 +96,14 @@ public class StorageApiDynamicDestinationsTableRow<T,
DestinationT>
+ "using a create disposition of CREATE_IF_NEEDED.");
}
}
+ } else {
+ // Make sure we register this schema with the cache, unless there's
already a more
+ // up-to-date schema.
+ tableSchema =
+ MoreObjects.firstNonNull(
+ SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema),
tableSchema);
}
+
descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
descriptorHash =
BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index 83246b26de2..e1775af2289 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -178,6 +178,21 @@ public class TableSchemaCache {
return schemaHolder.map(SchemaHolder::getTableSchema).orElse(null);
}
+ /**
+ * Registers schema for a table if one is not already present. If a schema
is already in the
+ * cache, returns the existing schema, otherwise returns null.
+ */
+ @Nullable
+ public TableSchema putSchemaIfAbsent(TableReference tableReference,
TableSchema tableSchema) {
+ final String key = tableKey(tableReference);
+ Optional<SchemaHolder> existing =
+ runUnderMonitor(
+ () ->
+ Optional.ofNullable(
+ this.cachedSchemas.putIfAbsent(key,
SchemaHolder.of(tableSchema, 0))));
+ return existing.map(SchemaHolder::getTableSchema).orElse(null);
+ }
+
public void refreshSchema(TableReference tableReference, DatasetService
datasetService) {
int targetVersion =
runUnderMonitor(
@@ -187,13 +202,11 @@ public class TableSchemaCache {
"Cannot call refreshSchema after the object has been
stopped!");
}
String key = tableKey(tableReference);
- SchemaHolder schemaHolder = cachedSchemas.get(key);
- if (schemaHolder == null) {
- throw new RuntimeException("Can't refresh unknown schema!");
- }
- tablesToRefresh.put(key, Refresh.of(datasetService,
schemaHolder.getVersion() + 1));
+ @Nullable SchemaHolder schemaHolder = cachedSchemas.get(key);
+ int nextVersion = schemaHolder != null ?
schemaHolder.getVersion() + 1 : 0;
+ tablesToRefresh.put(key, Refresh.of(datasetService,
nextVersion));
// Wait at least until the next version.
- return schemaHolder.getVersion() + 1;
+ return nextVersion;
});
waitForRefresh(tableReference, targetVersion);
}
@@ -237,11 +250,9 @@ public class TableSchemaCache {
.entrySet()
.removeIf(
entry -> {
- SchemaHolder schemaHolder =
cachedSchemas.get(entry.getKey());
- if (schemaHolder == null) {
- throw new RuntimeException("Unexpected null schema for "
+ entry.getKey());
- }
- return schemaHolder.getVersion() >=
entry.getValue().getTargetVersion();
+ @Nullable SchemaHolder schemaHolder =
cachedSchemas.get(entry.getKey());
+ return schemaHolder != null
+ && schemaHolder.getVersion() >=
entry.getValue().getTargetVersion();
});
} finally {
tableUpdateMonitor.leave();
@@ -272,10 +283,11 @@ public class TableSchemaCache {
if (timeRemaining.getMillis() > 0) {
Thread.sleep(timeRemaining.getMillis());
}
- } catch (InterruptedException e) {
- runUnderMonitor(() -> this.stopped = true);
- return;
- } catch (IOException e) {
+ } catch (Exception e) {
+ // Since this is a daemon thread, don't exit until it is explicitly shut
down. Exiting early
+ // can cause the
+ // pipeline to stall.
+ LOG.error("Caught exception in BigQuery's table schema cache refresh
thread: " + e);
}
this.refreshExecutor.submit(this::refreshThread);
}