[ 
https://issues.apache.org/jira/browse/BEAM-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris updated BEAM-7195:
------------------------
    Description: See the following StackOverflow question, which describes the 
detail  (was: I'm experiencing an issue when I run a Beam DataFlow (written in 
Java) in my local development environment using a 'direct runner' (for data 
entry into Google BigQuery).

 

I'm processing multiple items of data through a Beam pipeline on separate 
threads. I'm processing by using a ParDo that transforms the received data 
using an 'apply' over a PCollection. For the transformed data, I then attempt 
to write it to Google BigQuery.

I'm attempting to write my transformed data into BigQuery with a routine that 
looks like the following: 
{code:java}
transformedData
   .apply("Load fact data",
         BigQueryIO.<ValidatedDataRecord>write()
         .to(new LoadDataFact.DynamicFactTableDestination(dataType.label))
         .withFormatFunction(new LoadDataFact.FactSerializationFn())
         
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}
 

The significant thing here is that I'm using the following:
{code:java}
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
{code}
... because I want a new table to be created if doesn't already exist

 

The issue is that when new data becomes part of the dataflow, and that data 
requires a new dataset for the data, I frequently (but don't always) get the 
following 404 error:
{code:java}
Exception thrown in class : com.myOrg.myPackage.myClass Error : 
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table 
gator-acoem-cloud-eu:chris_halcrow_ecotech_com_dev_customer_dev_project.data_staging_iot_state_of_health",
"reason" : "notFound"
} ],
"message" : "Not found: Table 
gator-acoem-cloud-eu:chris_halcrow_ecotech_com_dev_customer_dev_project.data_staging_iot_state_of_health",
"status" : "NOT_FOUND"
}{code}
 

What's happening is that when the 1st new item of data is processed, the 
CreateTables.class in the 'Beam SDKs Java Core' library adds an entry to the 
static 'CreateTables.createdTables' collection for the new table, to indicate 
that the table is created (however the table is _in the process_ of being 
created). The 2nd new item of data then comes along, and a check is done on 
whether the new table exists. This check returns true, so the data processing 
continues, but the 404 occurs because the table doesn't _actually_ exist.

 

The CreateTables.class is part of the following Beam SDKs Java Core library:

{{C:\Users\my.username\.m2\repository\org\apache\beam\beam-sdks-java-io-google-cloud-platform\2.5.0\beam-sdks-java-io-google-cloud-platform-2.5.0.jar!\org\apache\beam\sdk\io\gcp\bigquery\CreateTables.class}}

 

The problem occurs in the 'getTableDestination' method:
{code:java}
private TableDestination getTableDestination(DoFn<KV<DestinationT, TableRow>, 
KV<TableDestination, TableRow>>.ProcessContext context, DestinationT 
destination) {
    TableDestination tableDestination = 
CreateTables.this.dynamicDestinations.getTable(destination);
    Preconditions.checkArgument(tableDestination != null, 
"DynamicDestinations.getTable() may not return null, but %s returned null for 
destination %s", CreateTables.this.dynamicDestinations, destination);
    Preconditions.checkArgument(tableDestination.getTableSpec() != null, 
"DynamicDestinations.getTable() must return a TableDestination with a non-null 
table spec, but %s returned %s for destination %s,which has a null table spec", 
CreateTables.this.dynamicDestinations, tableDestination, destination);
    TableReference tableReference = 
tableDestination.getTableReference().clone();
    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
        
tableReference.setProjectId(((BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class)).getProject());
        tableDestination = tableDestination.withTableReference(tableReference);
    }

    if (CreateTables.this.createDisposition == CreateDisposition.CREATE_NEVER) {
        return tableDestination;
    } else {
        String tableSpec = 
BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
        if (!CreateTables.createdTables.contains(tableSpec)) {
            synchronized(CreateTables.createdTables) {
                if (!CreateTables.createdTables.contains(tableSpec)) {
                    this.tryCreateTable(context, destination, tableDestination, 
tableSpec);
                }
            }
        }

        return tableDestination;
    }
}
{code}
  

Specifically, the problem occurs within this section:

(createdTables is a static `Set`, declared in CreateTables.class)
{code:java}
private static Set<String> createdTables = Collections.newSetFromMap(new 
ConcurrentHashMap());
{code}
{code:java}
if (!CreateTables.createdTables.contains(tableSpec)) {
  synchronized(CreateTables.createdTables) {
    if (!CreateTables.createdTables.contains(tableSpec)) {
      this.tryCreateTable(context, destination, tableDestination, tableSpec);
  }
}
{code}
 

The 1st time a piece of data is processed that needs a new table, in the first 
line of the code above:
{code:java}
if (!CreateTables.createdTables.contains(tableSpec)) {
{code}
 'CreateTables.createdTables' doesn't contain the `tableSpec`, however, on the 
2nd line, it exists:

 
{code:java}
synchronized(CreateTables.createdTables) {
{code}
I don't understand how this occurs, since the 'synchronized' routine seems to 
only be specifying that the subsequent logic should only apply to the static 
'createdTables' collection, and should be locked to a single thread. The new 
table hasn't yet been actually created. Since the table is now entered into the 
'createdTables' collection however, the following line never runs:

 
{code:java}
this.tryCreateTable(context, destination, tableDestination, tableSpec);{code}
 

What confuses me further is that when the following line executes:
{code:java}
synchronized(CreateTables.createdTables) {
{code}
I immediately get a message in my IntelliJ console that says that table 
creation is being attempted
{code:java}
INFO: Trying to create BigQuery table: 
my-gcp-project-name:new-dataset-name.new-table-name
{code}
... and the table is _sometimes_ created, but sometimes isn't. What seems to 
happen though is that the next piece of data comes through for processing, but 
the table often hasn't been created in time, so I get the 404 error.

 

I tried updating my Beam SDKs Java Core library to v 2.12.0, but for some 
reason the dataflow stopped working altogether. Looking at the code for the 
getTableDestination method in that version, it appears to be doing something 
very similar anyway, so I'm not confident that upgrading would solve the issue 
(the new code does give a more useful comment to explain the reason for the 
logic):

 
{code:java}
String tableSpec = 
BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
if (!createdTables.contains(tableSpec)) {
  // Another thread may have succeeded in creating the table in the meanwhile, 
so
  // check again. This check isn't needed for correctness, but we add it to 
prevent
  // every thread from attempting a create and overwhelming our BigQuery quota.
  synchronized (createdTables) {
    if (!createdTables.contains(tableSpec)) {
      tryCreateTable(context, destination, tableDestination, tableSpec, kmsKey);
    }
  }
}
return tableDestination;
{code}
Can you confirm if this is a bug? And is there a way I can work around it? 
Here's my pom.xml:

 
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.myOrg.myNamespace</groupId>
  <artifactId>my-artifact-name</artifactId>
  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.5.0</beam.version>
    <maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
        <!--This plugin's configuration is used to store Eclipse m2e settings 
only. It has no influence on the Maven build itself.-->
        <plugin>
           <groupId>org.eclipse.m2e</groupId>
           <artifactId>lifecycle-mapping</artifactId>
           <version>1.0.0</version>
           <configuration>
              <lifecycleMappingMetadata>
                 <pluginExecutions>
                    <pluginExecution>
                       <pluginExecutionFilter>
                          <groupId>
                             org.apache.maven.plugins
                          </groupId>
                          <artifactId>
                             maven-compiler-plugin
                          </artifactId>
                          <versionRange>
                             [@maven-compiler-plugin.version@,)
                          </versionRange>
                          <goals>
                             <goal>compile</goal>
                             <goal>testCompile</goal>
                          </goals>
                       </pluginExecutionFilter>
                       <action>
                          <ignore></ignore>
                       </action>
                    </pluginExecution>
                 </pluginExecutions>
              </lifecycleMappingMetadata>
           </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
  
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

  
     <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.5.0</version>
     </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>
    
    
    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.msgpack</groupId>
      <artifactId>msgpack-core</artifactId>
      <version>0.8.16</version>
    </dependency>
  </dependencies>
</project>

{code}
 )
        Summary: BigQuery - 404 errors for 'table not found' when using dynamic 
destinations  (was: BigQuery CreateTables.class - table is added to 
createdTables collection before it's actually created)

> BigQuery - 404 errors for 'table not found' when using dynamic destinations
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-7195
>                 URL: https://issues.apache.org/jira/browse/BEAM-7195
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.5.0
>         Environment: Windows
>            Reporter: Chris
>            Priority: Major
>
> See the following StackOverflow question, which describes the detail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to