[ 
https://issues.apache.org/jira/browse/BEAM-6206?focusedWorklogId=175489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175489
 ]

ASF GitHub Bot logged work on BEAM-6206:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Dec/18 17:30
            Start Date: 14/Dec/18 17:30
    Worklog Time Spent: 10m 
      Work Description: swegner closed pull request #7270: [BEAM-6206] Add 
CustomHttpErrors a tool to allow adding custom errors…
URL: https://github.com/apache/beam/pull/7270
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/CustomHttpErrors.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/CustomHttpErrors.java
new file mode 100644
index 000000000000..db46d981400f
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/CustomHttpErrors.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An optional component to use with the {@code RetryHttpRequestInitializer} 
in order to provide
+ * custom errors for failing http calls. This class allows you to specify 
custom error messages
+ * which match specific error codes and containing strings in the URL. The 
first matcher to match
+ * the request and response will be used to provide the custom error.
+ *
+ * <p>The intended use case here is to examine one of the logs emitted by a 
failing call made by the
+ * RetryHttpRequestInitializer, and then adding a custom error message which 
matches the URL and
+ * code for it.
+ *
+ * <p>Usage: See more in CustomHttpErrorsTest.
+ *
+ * <pre>{@code
+ * CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+ * builder.addErrorForCodeAndUrlContains(403,"/tables?", "Custom Error Msg");
+ * CustomHttpErrors customErrors = builder.build();
+ *
+ *
+ * RetryHttpRequestInitializer initializer = ...
+ * initializer.setCustomErrors(customErrors);
+ * }</pre>
+ *
+ * <p>Suggestions for future enhancements to anyone upgrading this file:
+ *
+ * <ul>
+ *   <li>This class is left open for extension, to allow different functions 
for HttpCallMatcher and
+ *       HttpCallCustomError to match and log errors. For example, new 
functionality may include
+ *       matching an error based on the HttpResponse body. Additionally, 
extracting and logging
+ *       strings from the HttpResponse body may make useful functionality.
+ *   <li>Add a methods to add custom errors based on inspecting the contents 
of the HttpRequest and
+ *       HttpResponse
+ *   <li>Be sure to update the HttpRequestWrapper and HttpResponseWrapper with 
any new getters that
+ *       you may use. The wrappers were introduced to add a layer of 
indirection which could be
+ *       mocked mocked out in tests. This was unfortunately needed because 
mockito cannot mock final
+ *       classes and its non trivial to just construct HttpRequest and 
HttpResponse objects.
+ *   <li>Making matchers composable with an AND operator may simplify 
enhancing this code, if
+ *       several different matchers are used.
+ * </ul>
+ *
+ * <p>
+ */
+public class CustomHttpErrors {
+
+  /**
+   * A simple Tuple class for creating a list of HttpResponseMatcher and 
HttpResponseCustomError to
+   * print for the responses.
+   */
+  @AutoValue
+  public abstract static class MatcherAndError implements Serializable {
+    static MatcherAndError create(HttpCallMatcher matcher, HttpCallCustomError 
customError) {
+      return new AutoValue_CustomHttpErrors_MatcherAndError(matcher, 
customError);
+    }
+
+    public abstract HttpCallMatcher getMatcher();
+
+    public abstract HttpCallCustomError getCustomError();
+  }
+
+  /** A Builder which allows building immutable CustomHttpErrors object. */
+  public static class Builder {
+
+    private List<MatcherAndError> matchersAndLogs = new 
ArrayList<MatcherAndError>();
+
+    public CustomHttpErrors build() {
+      return new CustomHttpErrors(this.matchersAndLogs);
+    }
+
+    /** Adds a matcher to log the provided string if the error matches a 
particular status code. */
+    public void addErrorForCode(int statusCode, String errorMessage) {
+      HttpCallMatcher matcher = (req, resp) -> resp.getStatusCode() == 
statusCode;
+      this.matchersAndLogs.add(MatcherAndError.create(matcher, 
simpleErrorMessage(errorMessage)));
+    }
+
+    /**
+     * Adds a matcher to log the provided string if the error matches a 
particular status code and
+     * the url contains a certain string.
+     */
+    public void addErrorForCodeAndUrlContains(
+        int statusCode, String urlContains, String errorMessage) {
+      HttpCallMatcher matcher =
+          (request, response) -> {
+            if (response.getStatusCode() == statusCode
+                && request.getUrl().toString().contains(urlContains)) {
+              return true;
+            }
+            return false;
+          };
+      this.matchersAndLogs.add(MatcherAndError.create(matcher, 
simpleErrorMessage(errorMessage)));
+    }
+
+    private static HttpCallCustomError simpleErrorMessage(String errorMessage) 
{
+      return (request, response) -> {
+        return errorMessage;
+      };
+    }
+  }
+
+  // The list of HttpRequest/Response matchers and functions to generate error 
strings.
+  private List<MatcherAndError> matchersAndLogs = new 
ArrayList<MatcherAndError>();
+
+  private CustomHttpErrors(List<MatcherAndError> matchersAndLogs) {
+    // Deep copy the matchersAndLogs, which allows the builder to be reused.
+    for (MatcherAndError m : matchersAndLogs) {
+      this.matchersAndLogs.add(m);
+    }
+  }
+
+  /** @return The the first custom error for the failing request and response 
to match, or null. */
+  public String getCustomError(HttpRequestWrapper req, HttpResponseWrapper 
res) {
+    for (MatcherAndError m : matchersAndLogs) {
+      if (m.getMatcher().matchResponse(req, res)) {
+        return m.getCustomError().customError(req, res);
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallCustomError.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallCustomError.java
new file mode 100644
index 000000000000..cb95c82141e8
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallCustomError.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/** Lambda interface for defining a custom error to log based on an http 
request and response. */
+interface HttpCallCustomError {
+
+  /** @return A string which represents a custom error to be logged for the 
request and response. */
+  String customError(HttpRequestWrapper request, HttpResponseWrapper response);
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallMatcher.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallMatcher.java
new file mode 100644
index 000000000000..2437d4506c6e
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpCallMatcher.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * Lambda interface for inspecting an http request and response to match the 
failure and possibly
+ * generate a custom error message with more context.
+ */
+interface HttpCallMatcher {
+
+  /** @return true iff the request and response represent a matching http 
c\all. */
+  boolean matchResponse(HttpRequestWrapper req, HttpResponseWrapper response);
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpRequestWrapper.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpRequestWrapper.java
new file mode 100644
index 000000000000..068a5940820f
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpRequestWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpRequest;
+
+/**
+ * These wrapper classes are necessary allow mocking out the HttpRequest and 
HttpResponse, since
+ * they are final classes and mockito cannot mock them. Note: There is an 
experimental mockito
+ * feature, but it causes many issues and several tests fail when it is 
enabled.
+ * 
https://stackoverflow.com/questions/14292863/how-to-mock-a-final-class-with-mockito
+ */
+class HttpRequestWrapper {
+
+  private HttpRequest request;
+
+  public HttpRequestWrapper(HttpRequest request) {
+    this.request = request;
+  }
+
+  public GenericUrl getUrl() {
+    return request.getUrl();
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpResponseWrapper.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpResponseWrapper.java
new file mode 100644
index 000000000000..3fdf780cd308
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/HttpResponseWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.HttpResponse;
+
+/**
+ * These wrapper classes are necessary allow mocking out the HttpRequest and 
HttpResponse, since
+ * they are final classes and mockito cannot mock them. Note: There is an 
experimental mockito
+ * feature, but it causes many issues and several tests fail when it is 
enabled.
+ * 
https://stackoverflow.com/questions/14292863/how-to-mock-a-final-class-with-mockito
+ */
+class HttpResponseWrapper {
+  private HttpResponse response;
+
+  public HttpResponseWrapper(HttpResponse response) {
+    this.response = response;
+  }
+
+  public int getStatusCode() {
+    return response.getStatusCode();
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
index 92982276c8dc..2df2e60908a8 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
@@ -70,16 +70,19 @@
     private final Set<Integer> ignoredResponseCodes;
     private int ioExceptionRetries;
     private int unsuccessfulResponseRetries;
+    @Nullable private CustomHttpErrors customHttpErrors;
 
     private LoggingHttpBackOffHandler(
         Sleeper sleeper,
         BackOff ioExceptionBackOff,
         BackOff unsucessfulResponseBackOff,
-        Set<Integer> ignoredResponseCodes) {
+        Set<Integer> ignoredResponseCodes,
+        @Nullable CustomHttpErrors customHttpErrors) {
       this.sleeper = sleeper;
       this.ioExceptionBackOff = ioExceptionBackOff;
       this.unsuccessfulResponseBackOff = unsucessfulResponseBackOff;
       this.ignoredResponseCodes = ignoredResponseCodes;
+      this.customHttpErrors = customHttpErrors;
     }
 
     @Override
@@ -126,12 +129,22 @@ public boolean handleResponse(HttpRequest request, 
HttpResponse response, boolea
             response.getStatusCode(),
             request.getUrl());
       } else {
+
         String message =
             "Request failed with code {}, "
                 + "performed {} retries due to IOExceptions, "
                 + "performed {} retries due to unsuccessful status codes, "
                 + "HTTP framework says request {} be retried, "
-                + "(caller responsible for retrying): {}";
+                + "(caller responsible for retrying): {}. {}";
+        String customLogMessage = "";
+        if (customHttpErrors != null) {
+          String error =
+              customHttpErrors.getCustomError(
+                  new HttpRequestWrapper(request), new 
HttpResponseWrapper(response));
+          if (error != null) {
+            customLogMessage = error;
+          }
+        }
         if (ignoredResponseCodes.contains(response.getStatusCode())) {
           // Log ignored response codes at a lower level
           LOG.debug(
@@ -140,7 +153,8 @@ public boolean handleResponse(HttpRequest request, 
HttpResponse response, boolea
               ioExceptionRetries,
               unsuccessfulResponseRetries,
               supportsRetry ? "can" : "cannot",
-              request.getUrl());
+              request.getUrl(),
+              customLogMessage);
         } else {
           LOG.warn(
               message,
@@ -148,7 +162,8 @@ public boolean handleResponse(HttpRequest request, 
HttpResponse response, boolea
               ioExceptionRetries,
               unsuccessfulResponseRetries,
               supportsRetry ? "can" : "cannot",
-              request.getUrl());
+              request.getUrl(),
+              customLogMessage);
         }
       }
       return willRetry;
@@ -173,6 +188,8 @@ private boolean retryOnStatusCode(int statusCode) {
 
   private final HttpResponseInterceptor responseInterceptor; // response 
Interceptor to use
 
+  private CustomHttpErrors customHttpErrors = null;
+
   private final NanoClock nanoClock; // used for testing
 
   private final Sleeper sleeper; // used for testing
@@ -235,7 +252,8 @@ public void initialize(HttpRequest request) throws 
IOException {
             // their default values).
             new 
ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
             new 
ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
-            ignoredResponseCodes);
+            ignoredResponseCodes,
+            this.customHttpErrors);
 
     request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
     request.setIOExceptionHandler(loggingHttpBackOffHandler);
@@ -246,6 +264,10 @@ public void initialize(HttpRequest request) throws 
IOException {
     }
   }
 
+  public void setCustomErrors(CustomHttpErrors customErrors) {
+    this.customHttpErrors = customErrors;
+  }
+
   public void setWriteTimeout(int writeTimeout) {
     this.writeTimeout = writeTimeout;
   }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/CustomHttpErrorsTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/CustomHttpErrorsTest.java
new file mode 100644
index 000000000000..ddb5a852ed37
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/CustomHttpErrorsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNull;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.mock;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for CustomHttpErrorsTest. */
+@RunWith(JUnit4.class)
+public class CustomHttpErrorsTest {
+
+  private static final String BQ_TABLES_LIST_URL =
+      ("http://www.googleapis.com/bigquery/v2/projects/";
+          + "myproject/datasets/mydataset/tables?maxResults=1000");
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  private static MockLowLevelHttpResponse createResponse(int code, String 
body) {
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.addHeader("custom_header", "value");
+    response.setStatusCode(code);
+    response.setContentType(Json.MEDIA_TYPE);
+    response.setContent(body);
+    return response;
+  }
+
+  private HttpRequestWrapper createHttpRequest(String url) throws 
MalformedURLException {
+    HttpRequestWrapper request = mock(HttpRequestWrapper.class);
+    GenericUrl genericUrl = new GenericUrl(new URL(url));
+    given(request.getUrl()).willReturn(genericUrl);
+    return request;
+  }
+
+  private HttpResponseWrapper createHttpResponse(int statusCode) {
+    HttpResponseWrapper response = mock(HttpResponseWrapper.class);
+    given(response.getStatusCode()).willReturn(statusCode);
+    return response;
+  }
+
+  @Test
+  public void testMatchesCode() throws IOException {
+    HttpRequestWrapper request = createHttpRequest(BQ_TABLES_LIST_URL);
+    HttpResponseWrapper response = createHttpResponse(403);
+    HttpCallCustomError mockCustomError = mock(HttpCallCustomError.class);
+
+    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+    builder.addErrorForCode(403, "Custom Error Msg");
+    CustomHttpErrors customErrors = builder.build();
+
+    String errorMessage = customErrors.getCustomError(request, response);
+    assertEquals("Custom Error Msg", errorMessage);
+  }
+
+  @Test
+  public void testNotMatchesCode() throws IOException {
+    HttpRequestWrapper request = createHttpRequest(BQ_TABLES_LIST_URL);
+    HttpResponseWrapper response = createHttpResponse(404);
+    HttpCallCustomError mockCustomError = mock(HttpCallCustomError.class);
+
+    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+    builder.addErrorForCode(403, "Custom Error Msg");
+
+    CustomHttpErrors customErrors = builder.build();
+
+    String errorMessage = customErrors.getCustomError(request, response);
+    assertNull(errorMessage);
+  }
+
+  @Test
+  public void testMatchesCodeAndUrlContains() throws IOException {
+    HttpRequestWrapper request = createHttpRequest(BQ_TABLES_LIST_URL);
+    HttpResponseWrapper response = createHttpResponse(403);
+    HttpCallCustomError mockCustomError = mock(HttpCallCustomError.class);
+
+    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+    builder.addErrorForCodeAndUrlContains(403, "/tables?", "Custom Error Msg");
+    CustomHttpErrors customErrors = builder.build();
+
+    String errorMessage = customErrors.getCustomError(request, response);
+    assertEquals("Custom Error Msg", errorMessage);
+  }
+
+  @Test
+  public void testNotMatchesCodeAndUrlContains() throws IOException {
+    HttpRequestWrapper request = createHttpRequest(BQ_TABLES_LIST_URL);
+    HttpResponseWrapper response = createHttpResponse(404);
+    HttpCallCustomError mockCustomError = mock(HttpCallCustomError.class);
+
+    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+    builder.addErrorForCodeAndUrlContains(403, "/doesnotmatch?", "Custom Error 
Msg");
+    CustomHttpErrors customErrors = builder.build();
+
+    String errorMessage = customErrors.getCustomError(request, response);
+    assertNull(errorMessage);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index f147634f9cab..524eccbeba84 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -63,6 +63,7 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.CustomHttpErrors;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
@@ -906,6 +907,7 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff 
backoff) throws Inte
   private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
     RetryHttpRequestInitializer httpRequestInitializer =
         new RetryHttpRequestInitializer(ImmutableList.of(404));
+    httpRequestInitializer.setCustomErrors(createBigQueryClientCustomErrors());
     httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout());
     return new Bigquery.Builder(
             Transport.getTransport(),
@@ -928,4 +930,18 @@ private static HttpRequestInitializer 
chainHttpRequestInitializer(
           new HttpCredentialsAdapter(credential), httpRequestInitializer);
     }
   }
+
+  public static CustomHttpErrors createBigQueryClientCustomErrors() {
+    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
+    // 403 errors, to list tables, matching this URL:
+    // http://www.googleapis.com/bigquery/v2/projects/myproject/datasets/
+    //     mydataset/tables?maxResults=1000
+    builder.addErrorForCodeAndUrlContains(
+        403,
+        "/tables?",
+        "The GCP project is most likely exceeding the rate limit on "
+            + "bigquery.tables.list, please find the instructions to increase 
this limit at: "
+            + 
"https://cloud.google.com/service-infrastructure/docs/rate-limiting#configure";);
+    return builder.build();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 175489)
    Time Spent: 4h  (was: 3h 50m)

> Dataflow template which reads from BigQuery fails if used more than once
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6206
>                 URL: https://issues.apache.org/jira/browse/BEAM-6206
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, runner-dataflow
>    Affects Versions: 2.8.0
>            Reporter: Neil McCrossin
>            Assignee: Tyler Akidau
>            Priority: Blocker
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> When a pipeline contains a BigQuery read, and when that pipeline is uploaded 
> as a template and the template is run in Cloud Dataflow, it will run 
> successfully the first time, but after that it will fail because it can't 
> find a file in the folder BigQueryExtractTemp (see error message below). If 
> the template is uploaded again it will work again +once only+ and then fail 
> again every time after the first time.
> *Error message:*
>  java.io.FileNotFoundException: No files matched spec: 
> gs://bigquery-bug-report-4539/temp/BigQueryExtractTemp/847a342637a64e73b126ad33f764dcc9/000000000000.avro
> *Steps to reproduce:*
>  1. Create the Beam Word Count sample as described 
> [here|https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven].
> 2. Copy the command line from the section "Run WordCount on the Cloud 
> Dataflow service" and substitute in your own project id and bucket name. Make 
> sure you can run it successfully.
> 3. In the file WordCount.java, add the following lines below the existing 
> import statements:
> {code:java}
> import org.apache.beam.sdk.coders.AvroCoder;
> import org.apache.beam.sdk.coders.DefaultCoder;
> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
> import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
> import org.apache.beam.sdk.transforms.SerializableFunction;
> @DefaultCoder(AvroCoder.class)
> class TestOutput
> { 
> }
> {code}
>  
>  4. In this same file, replace the entire method runWordCount with the 
> following code:
> {code:java}
> static void runWordCount(WordCountOptions options) {
>   Pipeline p = Pipeline.create(options);
>   p.apply("ReadBigQuery", BigQueryIO
>     .read(new SerializableFunction<SchemaAndRecord, TestOutput>() {
>       public TestOutput apply(SchemaAndRecord record) {
>         return new TestOutput();
>       }
>     })
>     .from("bigquery-public-data:stackoverflow.tags")
>   );
>   p.run();
> }
> {code}
> (Note I am using the stackoverflow.tags table for purposes of demonstration 
> because it is public and not too large, but the problem seems to occur for 
> any table).
> 5. Add the following pipeline parameters to the command line that you have 
> been using:
> {code:java}
> --tempLocation=gs://<STORAGE_BUCKET>/temp/
> --templateLocation=gs://<STORAGE_BUCKET>/my-bigquery-dataflow-template
> {code}
> 6. Run the command line so that the template is created.
> 7. Launch the template through the Cloud Console by clicking on "CREATE JOB 
> FROM TEMPLATE". Give it the job name "test-1", choose "Custom Template" at 
> the bottom of the list and browse to the template 
> "my-bigquery-dataflow-template", then press "Run job".
> 8. The job should succeed. But then repeat step 7 and it will fail.
> 9. Repeat steps 6 and 7 and it will work again. Repeat step 7 and it will 
> fail again.
>  
> This bug may be related to BEAM-2058 (just a hunch).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to