This is an automated email from the ASF dual-hosted git repository. mwalenia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9ff3deb [BEAM-9895] Added integration test for SnowflakeIO new 70a9399 Merge pull request #12176 from PolideaInternal/BEAM-9895-snowflakeioit 9ff3deb is described below commit 9ff3debdf2ca205419eb12daa3f37cdabb99067c Author: Dariusz Aniszewski <dariusz.aniszew...@polidea.com> AuthorDate: Fri Jul 3 14:33:22 2020 +0200 [BEAM-9895] Added integration test for SnowflakeIO review fixes renamed test --- .../sdk/io/snowflake/test/BatchSnowflakeIOIT.java | 175 +++++++++++++++++++++ .../beam/sdk/io/snowflake/test/TestUtils.java | 38 +++++ 2 files changed, 213 insertions(+) diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/BatchSnowflakeIOIT.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/BatchSnowflakeIOIT.java new file mode 100644 index 0000000..bc6ce7d --- /dev/null +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/BatchSnowflakeIOIT.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.test; + +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.apache.beam.sdk.io.snowflake.test.TestUtils.SnowflakeIOITPipelineOptions; +import static org.apache.beam.sdk.io.snowflake.test.TestUtils.getTestRowCsvMapper; +import static org.apache.beam.sdk.io.snowflake.test.TestUtils.getTestRowDataMapper; + +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory; +import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * A test of {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} on an independent Snowflake + * instance. + * + * <p>This test requires a running instance of Snowflake, configured for your GCP account. Pass in + * connection information using PipelineOptions: + * + * <pre> + * ./gradlew -p sdks/java/io/snowflake integrationTest -DintegrationTestPipelineOptions='[ + * "--serverName=<YOUR SNOWFLAKE SERVER NAME>", + * "--username=<USERNAME>", + * "--password=<PASSWORD>", + * "--database=<DATABASE NAME>", + * "--role=<SNOWFLAKE ROLE>", + * "--warehouse=<SNOWFLAKE WAREHOUSE NAME>", + * "--schema=<SCHEMA NAME>", + * "--stagingBucketName=gs://<GCS BUCKET NAME>", + * "--storageIntegrationName=<STORAGE INTEGRATION NAME>", + * "--numberOfRecords=<1000, 100000, 600000, 5000000>", + * "--runner=DataflowRunner", + * "--region=<GCP REGION FOR DATAFLOW RUNNER>", + * "--project=<GCP_PROJECT>"]' + * --tests org.apache.beam.sdk.io.snowflake.test.BatchSnowflakeIOIT + * -DintegrationTestRunner=dataflow + * </pre> + */ +public class BatchSnowflakeIOIT { + private static final String tableName = "IOIT"; + + private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration; + private static int numberOfRecords; + private static String stagingBucketName; + private static String storageIntegrationName; + + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); + + @BeforeClass + public static void setup() throws SQLException { + SnowflakeIOITPipelineOptions options = + readIOTestPipelineOptions(SnowflakeIOITPipelineOptions.class); + + numberOfRecords = options.getNumberOfRecords(); + stagingBucketName = options.getStagingBucketName(); + storageIntegrationName = options.getStorageIntegrationName(); + + dataSourceConfiguration = + SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options)) + .withDatabase(options.getDatabase()) + .withRole(options.getRole()) + .withWarehouse(options.getWarehouse()) + .withServerName(options.getServerName()) + .withSchema(options.getSchema()); + + TestUtils.runConnectionWithStatement( + dataSourceConfiguration.buildDatasource(), + String.format( + "CREATE OR REPLACE TABLE \"%s\" (\"ID\" INTEGER, \"NAME\" STRING)", tableName)); + } + + @Test + public void testWriteThenRead() { + PipelineResult writeResult = runWrite(); + writeResult.waitUntilFinish(); + + PipelineResult readResult = runRead(); + readResult.waitUntilFinish(); + } + + @AfterClass + public static void teardown() throws Exception { + String combinedPath = stagingBucketName + "/**"; + List<ResourceId> paths = + FileSystems.match(combinedPath).metadata().stream() + .map(MatchResult.Metadata::resourceId) + .collect(Collectors.toList()); + + FileSystems.delete(paths, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + TestUtils.runConnectionWithStatement( + dataSourceConfiguration.buildDatasource(), String.format("DROP TABLE %s", tableName)); + } + + private PipelineResult runWrite() { + + pipelineWrite + .apply(GenerateSequence.from(0).to(numberOfRecords)) + .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply( + SnowflakeIO.<TestRow>write() + .withDataSourceConfiguration(dataSourceConfiguration) + .withWriteDisposition(WriteDisposition.TRUNCATE) + .withUserDataMapper(getTestRowDataMapper()) + .withTable(tableName) + .withStagingBucketName(stagingBucketName) + .withStorageIntegrationName(storageIntegrationName)); + + return pipelineWrite.run(); + } + + private PipelineResult runRead() { + PCollection<TestRow> namesAndIds = + pipelineRead.apply( + SnowflakeIO.<TestRow>read() + .withDataSourceConfiguration(dataSourceConfiguration) + .fromTable(tableName) + .withStagingBucketName(stagingBucketName) + .withStorageIntegrationName(storageIntegrationName) + .withCsvMapper(getTestRowCsvMapper()) + .withCoder(SerializableCoder.of(TestRow.class))); + + PAssert.thatSingleton(namesAndIds.apply("Count All", Count.globally())) + .isEqualTo((long) numberOfRecords); + + PCollection<String> consolidatedHashcode = + namesAndIds + .apply(ParDo.of(new TestRow.SelectNameFn())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRecords)); + + return pipelineRead.run(); + } +} diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java index ec458ea..d2671ff 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java @@ -26,12 +26,20 @@ import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; +import javax.sql.DataSource; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; +import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @@ -44,6 +52,26 @@ public class TestUtils { private static final String PRIVATE_KEY_FILE_NAME = "test_rsa_key.p8"; private static final String PRIVATE_KEY_PASSPHRASE = "snowflake"; + public interface SnowflakeIOITPipelineOptions + extends IOTestPipelineOptions, SnowflakePipelineOptions {} + + public static ResultSet runConnectionWithStatement(DataSource dataSource, String query) + throws SQLException { + + Connection connection = dataSource.getConnection(); + return runStatement(query, connection); + } + + public static ResultSet runStatement(String query, Connection connection) throws SQLException { + PreparedStatement statement = connection.prepareStatement(query); + try { + return statement.executeQuery(); + } finally { + statement.close(); + connection.close(); + } + } + public static String getPrivateKeyPath(Class klass) { ClassLoader classLoader = klass.getClassLoader(); File file = new File(classLoader.getResource(PRIVATE_KEY_FILE_NAME).getFile()); @@ -78,6 +106,16 @@ public class TestUtils { return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new Long[] {recordLine}; } + public static SnowflakeIO.CsvMapper<TestRow> getTestRowCsvMapper() { + return (SnowflakeIO.CsvMapper<TestRow>) + parts -> TestRow.create(Integer.valueOf(parts[0]), parts[1]); + } + + public static SnowflakeIO.UserDataMapper<TestRow> getTestRowDataMapper() { + return (SnowflakeIO.UserDataMapper<TestRow>) + (TestRow element) -> new Object[] {element.id(), element.name()}; + } + public static class ParseToKv extends DoFn<Long, KV<String, Long>> { @ProcessElement public void processElement(ProcessContext c) {