[
https://issues.apache.org/jira/browse/BEAM-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris updated BEAM-7195:
------------------------
Description: See the following StackOverflow question, which describes the
detail (was: I'm experiencing an issue when I run a Beam DataFlow (written in
Java) in my local development environment using a 'direct runner' (for data
entry into Google BigQuery).
I'm processing multiple items of data through a Beam pipeline on separate
threads. I'm processing by using a ParDo that transforms the received data
using an 'apply' over a PCollection. For the transformed data, I then attempt
to write it to Google BigQuery.
I'm attempting to write my transformed data into BigQuery with a routine that
looks like the following:
{code:java}
transformedData
.apply("Load fact data",
BigQueryIO.<ValidatedDataRecord>write()
.to(new LoadDataFact.DynamicFactTableDestination(dataType.label))
.withFormatFunction(new LoadDataFact.FactSerializationFn())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}
The significant thing here is that I'm using the following:
{code:java}
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
{code}
... because I want a new table to be created if doesn't already exist
The issue is that when new data becomes part of the dataflow, and that data
requires a new dataset for the data, I frequently (but don't always) get the
following 404 error:
{code:java}
Exception thrown in class : com.myOrg.myPackage.myClass Error :
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table
gator-acoem-cloud-eu:chris_halcrow_ecotech_com_dev_customer_dev_project.data_staging_iot_state_of_health",
"reason" : "notFound"
} ],
"message" : "Not found: Table
gator-acoem-cloud-eu:chris_halcrow_ecotech_com_dev_customer_dev_project.data_staging_iot_state_of_health",
"status" : "NOT_FOUND"
}{code}
What's happening is that when the 1st new item of data is processed, the
CreateTables.class in the 'Beam SDKs Java Core' library adds an entry to the
static 'CreateTables.createdTables' collection for the new table, to indicate
that the table is created (however the table is _in the process_ of being
created). The 2nd new item of data then comes along, and a check is done on
whether the new table exists. This check returns true, so the data processing
continues, but the 404 occurs because the table doesn't _actually_ exist.
The CreateTables.class is part of the following Beam SDKs Java Core library:
{{C:\Users\my.username\.m2\repository\org\apache\beam\beam-sdks-java-io-google-cloud-platform\2.5.0\beam-sdks-java-io-google-cloud-platform-2.5.0.jar!\org\apache\beam\sdk\io\gcp\bigquery\CreateTables.class}}
The problem occurs in the 'getTableDestination' method:
{code:java}
private TableDestination getTableDestination(DoFn<KV<DestinationT, TableRow>,
KV<TableDestination, TableRow>>.ProcessContext context, DestinationT
destination) {
TableDestination tableDestination =
CreateTables.this.dynamicDestinations.getTable(destination);
Preconditions.checkArgument(tableDestination != null,
"DynamicDestinations.getTable() may not return null, but %s returned null for
destination %s", CreateTables.this.dynamicDestinations, destination);
Preconditions.checkArgument(tableDestination.getTableSpec() != null,
"DynamicDestinations.getTable() must return a TableDestination with a non-null
table spec, but %s returned %s for destination %s,which has a null table spec",
CreateTables.this.dynamicDestinations, tableDestination, destination);
TableReference tableReference =
tableDestination.getTableReference().clone();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
tableReference.setProjectId(((BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class)).getProject());
tableDestination = tableDestination.withTableReference(tableReference);
}
if (CreateTables.this.createDisposition == CreateDisposition.CREATE_NEVER) {
return tableDestination;
} else {
String tableSpec =
BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
if (!CreateTables.createdTables.contains(tableSpec)) {
synchronized(CreateTables.createdTables) {
if (!CreateTables.createdTables.contains(tableSpec)) {
this.tryCreateTable(context, destination, tableDestination,
tableSpec);
}
}
}
return tableDestination;
}
}
{code}
Specifically, the problem occurs within this section:
(createdTables is a static `Set`, declared in CreateTables.class)
{code:java}
private static Set<String> createdTables = Collections.newSetFromMap(new
ConcurrentHashMap());
{code}
{code:java}
if (!CreateTables.createdTables.contains(tableSpec)) {
synchronized(CreateTables.createdTables) {
if (!CreateTables.createdTables.contains(tableSpec)) {
this.tryCreateTable(context, destination, tableDestination, tableSpec);
}
}
{code}
The 1st time a piece of data is processed that needs a new table, in the first
line of the code above:
{code:java}
if (!CreateTables.createdTables.contains(tableSpec)) {
{code}
'CreateTables.createdTables' doesn't contain the `tableSpec`, however, on the
2nd line, it exists:
{code:java}
synchronized(CreateTables.createdTables) {
{code}
I don't understand how this occurs, since the 'synchronized' routine seems to
only be specifying that the subsequent logic should only apply to the static
'createdTables' collection, and should be locked to a single thread. The new
table hasn't yet been actually created. Since the table is now entered into the
'createdTables' collection however, the following line never runs:
{code:java}
this.tryCreateTable(context, destination, tableDestination, tableSpec);{code}
What confuses me further is that when the following line executes:
{code:java}
synchronized(CreateTables.createdTables) {
{code}
I immediately get a message in my IntelliJ console that says that table
creation is being attempted
{code:java}
INFO: Trying to create BigQuery table:
my-gcp-project-name:new-dataset-name.new-table-name
{code}
... and the table is _sometimes_ created, but sometimes isn't. What seems to
happen though is that the next piece of data comes through for processing, but
the table often hasn't been created in time, so I get the 404 error.
I tried updating my Beam SDKs Java Core library to v 2.12.0, but for some
reason the dataflow stopped working altogether. Looking at the code for the
getTableDestination method in that version, it appears to be doing something
very similar anyway, so I'm not confident that upgrading would solve the issue
(the new code does give a more useful comment to explain the reason for the
logic):
{code:java}
String tableSpec =
BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
if (!createdTables.contains(tableSpec)) {
// Another thread may have succeeded in creating the table in the meanwhile,
so
// check again. This check isn't needed for correctness, but we add it to
prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
synchronized (createdTables) {
if (!createdTables.contains(tableSpec)) {
tryCreateTable(context, destination, tableDestination, tableSpec, kmsKey);
}
}
}
return tableDestination;
{code}
Can you confirm if this is a bug? And is there a way I can work around it?
Here's my pom.xml:
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myOrg.myNamespace</groupId>
<artifactId>my-artifact-name</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.5.0</beam.version>
<maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<versionRange>
[@maven-compiler-plugin.version@,)
</versionRange>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.16</version>
</dependency>
</dependencies>
</project>
{code}
)
Summary: BigQuery - 404 errors for 'table not found' when using dynamic
destinations (was: BigQuery CreateTables.class - table is added to
createdTables collection before it's actually created)
> BigQuery - 404 errors for 'table not found' when using dynamic destinations
> ---------------------------------------------------------------------------
>
> Key: BEAM-7195
> URL: https://issues.apache.org/jira/browse/BEAM-7195
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.5.0
> Environment: Windows
> Reporter: Chris
> Priority: Major
>
> See the following StackOverflow question, which describes the detail
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)