Repository: beam Updated Branches: refs/heads/master 652a919ed -> 2e3c17a35
http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java new file mode 100644 index 0000000..2bdfffa --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.testing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Transport; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A matcher to verify data in BigQuery by processing given query + * and comparing with content's checksum. + * + * <p>Example: + * <pre>{@code [ + * assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum)); + * ]}</pre> + */ +@NotThreadSafe +public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> + implements SerializableMatcher<PipelineResult> { + private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class); + + // The maximum number of retries to execute a BigQuery RPC + static final int MAX_QUERY_RETRIES = 4; + + // The initial backoff for executing a BigQuery RPC + private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L); + + // The total number of rows in query response to be formatted for debugging purpose + private static final int TOTAL_FORMATTED_ROWS = 20; + + // The backoff factory with initial configs + static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_QUERY_RETRIES) + .withInitialBackoff(INITIAL_BACKOFF); + + private final String applicationName; + private final String projectId; + private final String query; + private final String expectedChecksum; + private String actualChecksum; + private transient QueryResponse response; + + public BigqueryMatcher( + String applicationName, String projectId, String query, String expectedChecksum) { + validateArgument("applicationName", applicationName); + validateArgument("projectId", projectId); + validateArgument("query", query); + validateArgument("expectedChecksum", expectedChecksum); + + this.applicationName = applicationName; + this.projectId = projectId; + this.query = query; + this.expectedChecksum = expectedChecksum; + } + + @Override + protected boolean matchesSafely(PipelineResult pipelineResult) { + LOG.info("Verifying Bigquery data"); + Bigquery bigqueryClient = newBigqueryClient(applicationName); + + // execute query + LOG.debug("Executing query: {}", query); + try { + QueryRequest queryContent = new QueryRequest(); + queryContent.setQuery(query); + + response = queryWithRetries( + bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedIOException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException("Failed to fetch BigQuery data.", e); + } + + if (!response.getJobComplete()) { + // query job not complete, verification failed + return false; + } else { + // compute checksum + actualChecksum = generateHash(response.getRows()); + LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); + + return expectedChecksum.equals(actualChecksum); + } + } + + @VisibleForTesting + Bigquery newBigqueryClient(String applicationName) { + HttpTransport transport = Transport.getTransport(); + JsonFactory jsonFactory = Transport.getJsonFactory(); + Credentials credential = getDefaultCredential(); + + return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential)) + .setApplicationName(applicationName) + .build(); + } + + @Nonnull + @VisibleForTesting + QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent, + Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOException lastException = null; + do { + if (lastException != null) { + LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException); + } + try { + QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute(); + if (response != null) { + return response; + } else { + lastException = + new IOException("Expected valid response from query job, but received null."); + } + } catch (IOException e) { + // ignore and retry + lastException = e; + } + } while(BackOffUtils.next(sleeper, backOff)); + + throw new RuntimeException( + String.format( + "Unable to get BigQuery response after retrying %d times using query (%s)", + MAX_QUERY_RETRIES, + queryContent.getQuery()), + lastException); + } + + private void validateArgument(String name, String value) { + checkArgument( + !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value); + } + + private Credentials getDefaultCredential() { + GoogleCredentials credential; + try { + credential = GoogleCredentials.getApplicationDefault(); + } catch (IOException e) { + throw new RuntimeException("Failed to get application default credential.", e); + } + + if (credential.createScopedRequired()) { + Collection<String> bigqueryScope = + Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY); + credential = credential.createScoped(bigqueryScope); + } + return credential; + } + + private String generateHash(@Nonnull List<TableRow> rows) { + List<HashCode> rowHashes = Lists.newArrayList(); + for (TableRow row : rows) { + List<String> cellsInOneRow = Lists.newArrayList(); + for (TableCell cell : row.getF()) { + cellsInOneRow.add(Objects.toString(cell.getV())); + Collections.sort(cellsInOneRow); + } + rowHashes.add( + Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(rowHashes).toString(); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Expected checksum is (") + .appendText(expectedChecksum) + .appendText(")"); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + String info; + if (!response.getJobComplete()) { + // query job not complete + info = String.format("The query job hasn't completed. Got response: %s", response); + } else { + // checksum mismatch + info = String.format("was (%s).%n" + + "\tTotal number of rows are: %d.%n" + + "\tQueried data details:%s", + actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS)); + } + description.appendText(info); + } + + private String formatRows(int totalNumRows) { + StringBuilder samples = new StringBuilder(); + List<TableRow> rows = response.getRows(); + for (int i = 0; i < totalNumRows && i < rows.size(); i++) { + samples.append(String.format("%n\t\t")); + for (TableCell field : rows.get(i).getF()) { + samples.append(String.format("%-10s", field.getV())); + } + } + if (rows.size() > totalNumRows) { + samples.append(String.format("%n\t\t...")); + } + return samples.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6f9f7bcd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java new file mode 100644 index 0000000..5fcdce9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.testing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.math.BigInteger; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link BigqueryMatcher}. + */ +@RunWith(JUnit4.class) +public class BigqueryMatcherTest { + private final String appName = "test-app"; + private final String projectId = "test-project"; + private final String query = "test-query"; + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + @Mock private Bigquery mockBigqueryClient; + @Mock private Bigquery.Jobs mockJobs; + @Mock private Bigquery.Jobs.Query mockQuery; + @Mock private PipelineResult mockResult; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockBigqueryClient.jobs()).thenReturn(mockJobs); + when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery); + } + + @Test + public void testBigqueryMatcherThatSucceeds() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher( + appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + assertThat(mockResult, matcher); + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + + @Test + public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "incorrect-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + thrown.expect(AssertionError.class); + thrown.expectMessage("Total number of rows are: 1"); + thrown.expectMessage("abc"); + try { + assertThat(mockResult, matcher); + } finally { + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + } + + @Test + public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false)); + + thrown.expect(AssertionError.class); + thrown.expectMessage("The query job hasn't completed."); + thrown.expectMessage("jobComplete=false"); + try { + assertThat(mockResult, matcher); + } finally { + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + } + + @Test + public void testQueryWithRetriesWhenServiceFails() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + when(mockQuery.execute()).thenThrow(new IOException()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to get BigQuery response after retrying"); + try { + matcher.queryWithRetries( + mockBigqueryClient, + new QueryRequest(), + fastClock, + BigqueryMatcher.BACKOFF_FACTORY.backoff()); + } finally { + verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) + .query(eq(projectId), eq(new QueryRequest())); + } + } + + @Test + public void testQueryWithRetriesWhenQueryResponseNull() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + when(mockQuery.execute()).thenReturn(null); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to get BigQuery response after retrying"); + try { + matcher.queryWithRetries( + mockBigqueryClient, + new QueryRequest(), + fastClock, + BigqueryMatcher.BACKOFF_FACTORY.backoff()); + } finally { + verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) + .query(eq(projectId), eq(new QueryRequest())); + } + } + + private QueryResponse createResponseContainingTestData() { + TableCell field1 = new TableCell(); + field1.setV("abc"); + TableCell field2 = new TableCell(); + field2.setV("2"); + TableCell field3 = new TableCell(); + field3.setV("testing BigQuery matcher."); + TableRow row = new TableRow(); + row.setF(Lists.newArrayList(field1, field2, field3)); + + QueryResponse response = new QueryResponse(); + response.setJobComplete(true); + response.setRows(Lists.newArrayList(row)); + response.setTotalRows(BigInteger.ONE); + return response; + } +}