[
https://issues.apache.org/jira/browse/BEAM-6311?focusedWorklogId=180871&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180871
]
ASF GitHub Bot logged work on BEAM-6311:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jan/19 21:29
Start Date: 03/Jan/19 21:29
Worklog Time Spent: 10m
Work Description: apilloud commented on pull request #7378: [BEAM-6311]
[BEAM-6312] [BEAM-6319] Improve BigQueryClient and BigQueryToTableIT
URL: https://github.com/apache/beam/pull/7378#discussion_r245134037
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java
##########
@@ -394,28 +420,76 @@ public void deleteDataset(String projectId, String
datasetId) {
}
}
- public void createNewTable(String projectId, String datasetId, Table
newTable) {
- try {
- this.bqClient.tables().insert(projectId, datasetId, newTable).execute();
- LOG.info("Successfully created new table: " + newTable.getId());
- } catch (Exception e) {
- LOG.debug("Exceptions caught when creating new table: " +
e.getMessage());
- }
+ public void createNewTable(String projectId, String datasetId, Table
newTable)
+ throws IOException, InterruptedException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+ IOException lastException = null;
+ do {
+ if (lastException != null) {
+ LOG.warn("Retrying create table ({}) after exception",
newTable.getId(), lastException);
+ }
+ try {
+ Table response = this.bqClient.tables().insert(projectId, datasetId,
newTable).execute();
+ if (response != null) {
+ LOG.info("Successfully created new table: " + response.getId());
+ return;
+ } else {
+ lastException =
+ new IOException("Expected valid response from create table job,
but received null.");
+ }
+ } catch (IOException e) {
+ // ignore and retry
+ lastException = e;
+ }
+ } while (BackOffUtils.next(sleeper, backoff));
+
+ throw new RuntimeException(
+ String.format(
+ "Unable to get BigQuery response after retrying %d times for table
(%s)",
+ MAX_QUERY_RETRIES, newTable.getId()),
+ lastException);
}
public void insertDataToTable(
- String projectId, String datasetId, String tableName, List<Map<String,
Object>> rows) {
- try {
- List<Rows> dataRows =
- rows.stream().map(row -> new
Rows().setJson(row)).collect(Collectors.toList());
- this.bqClient
- .tabledata()
- .insertAll(
- projectId, datasetId, tableName, new
TableDataInsertAllRequest().setRows(dataRows))
- .execute();
- LOG.info("Successfully inserted data into table : " + tableName);
- } catch (Exception e) {
- LOG.debug("Exceptions caught when inserting data: " + e.getMessage());
- }
+ String projectId, String datasetId, String tableName, List<Map<String,
Object>> rows)
+ throws IOException, InterruptedException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+ IOException lastException = null;
+ do {
+ if (lastException != null) {
+ LOG.warn("Retrying insert table ({}) after exception", tableName,
lastException);
+ }
+ try {
+ List<Rows> dataRows =
+ rows.stream().map(row -> new
Rows().setJson(row)).collect(Collectors.toList());
+ TableDataInsertAllResponse response =
+ this.bqClient
+ .tabledata()
+ .insertAll(
+ projectId,
+ datasetId,
+ tableName,
+ new TableDataInsertAllRequest().setRows(dataRows))
+ .execute();
+ if (response != null || response.getInsertErrors().isEmpty()) {
+ LOG.info("Successfully inserted data into table : " + tableName);
+ return;
+ } else {
+ lastException =
+ new IOException("Expected valid response from insert data job,
but received null.");
Review comment:
Might want to make a different exception for response.getInsertErrors() so
you can debug that case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 180871)
Time Spent: 20m (was: 10m)
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryToTableIT.testStandardQueryWithoutCustom
> flakey
> -------------------------------------------------------------------------------------------
>
> Key: BEAM-6311
> URL: https://issues.apache.org/jira/browse/BEAM-6311
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Reporter: Andrew Pilloud
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> https://builds.apache.org/job/beam_PostCommit_Java/2208/testReport/junit/org.apache.beam.sdk.io.gcp.bigquery/BigQueryToTableIT/testStandardQueryWithoutCustom/
> {code}
> java.lang.IllegalArgumentException: BigQuery dataset not found for table
> "apache-beam-testing:bq_query_to_table_1545853773146_752.output_table" .
> Please create the dataset before pipeline execution. If the dataset is
> created by an earlier stage of the pipeline, this validation can be disabled
> using #withoutValidation.
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.verifyDatasetPresence(BigQueryHelpers.java:491)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validate(BigQueryIO.java:1559)
> at
> org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:641)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:645)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:577)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:312)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryToTableIT.runBigQueryToTablePipeline(BigQueryToTableIT.java:111)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryToTableIT.testStandardQueryWithoutCustom(BigQueryToTableIT.java:295)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:349)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:314)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:312)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:396)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Unable to get dataset:
> bq_query_to_table_1545853773146_752, aborting after 9 retries.
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:894)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.getDataset(BigQueryServicesImpl.java:566)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.verifyDatasetPresence(BigQueryHelpers.java:486)
> ... 62 more
> Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
> 404 Not Found
> {
> "code" : 404,
> "errors" : [ {
> "domain" : "global",
> "message" : "Not found: Dataset
> apache-beam-testing:bq_query_to_table_1545853773146_752",
> "reason" : "notFound"
> } ],
> "message" : "Not found: Dataset
> apache-beam-testing:bq_query_to_table_1545853773146_752",
> "status" : "NOT_FOUND"
> }
> at
> com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:401)
> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1097)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:885)
> ... 64 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)