[ https://issues.apache.org/jira/browse/BEAM-3997?focusedWorklogId=96135&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96135 ]
ASF GitHub Bot logged work on BEAM-3997: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Apr/18 16:30 Start Date: 27/Apr/18 16:30 Worklog Time Spent: 10m Work Description: iemejia closed pull request #5097: [BEAM-3997] HCatalog integration test URL: https://github.com/apache/beam/pull/5097 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/hcatalog/build.gradle b/sdks/java/io/hcatalog/build.gradle index 4867c43e0d3..ae5999850ba 100644 --- a/sdks/java/io/hcatalog/build.gradle +++ b/sdks/java/io/hcatalog/build.gradle @@ -31,6 +31,15 @@ test { ignoreFailures true } +/* + * We need to rely on manually specifying these evaluationDependsOn to ensure that + * the following projects are evaluated before we evaluate this project. This is because + * we are attempting to reference the "sourceSets.test.output" directly. + * TODO: Swap to generating test artifacts which we can then rely on instead of + * the test outputs directly. + */ +evaluationDependsOn(":beam-sdks-java-io-common") + dependencies { compile library.java.guava shadow project(path: ":beam-sdks-java-core", configuration: "shadow") @@ -52,6 +61,7 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java" } testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") + testCompile project(":beam-sdks-java-io-common").sourceSets.test.output testCompile library.java.commons_io_2x testCompile library.java.junit testCompile library.java.hamcrest_core diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml index b98176f6f19..5d9a4b60b2d 100644 --- a/sdks/java/io/hcatalog/pom.xml +++ b/sdks/java/io/hcatalog/pom.xml @@ -35,6 +35,124 @@ <apache.commons.version>2.5</apache.commons.version> </properties> + <profiles> + <!-- + This profile invokes PerfKitBenchmarker, which does benchmarking of + the IO ITs. The arguments passed to it allow it to invoke mvn again + with the desired benchmark. + + To invoke this, run: + + mvn verify -Dio-it-suite -pl sdks/java/io/hcatalog + -DpkbLocation="path-to-pkb.py" \ + -DintegrationTestPipelineOptions='[ + "––numberOfRecords=1000", + "––HCatalogMetastoreHostName=hcatalog-metastore", + "––HCatalogHivePort=10000", + "––HCatalogHiveDatabaseName=default", + "––HCatalogHiveUsername=user", + "––HCatalogHivePassword=password" + "]' \ + + For DirectRunner, please use -DforceDirectRunner=true argument + For other runners please check doc in BEAM-3060 and https://beam.apache.org/documentation/io/testing/ + --> + <profile> + <id>io-it-suite</id> + <activation> + <property><name>io-it-suite</name></property> + </activation> + <properties> + <!-- This is based on the location of the current pom relative to the root + See discussion in BEAM-2460 --> + <beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>groovy-maven-plugin</artifactId> + <version>${groovy-maven-plugin.version}</version> + <executions> + <execution> + <id>find-supported-python-for-compile</id> + <phase>initialize</phase> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven-exec-plugin.version}</version> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>${python.interpreter.bin}</executable> + <arguments> + <argument>${pkbLocation}</argument> + <argument>-benchmarks=beam_integration_benchmark</argument> + <argument>-beam_it_profile=io-it</argument> + <argument>-beam_it_timeout=${pkbTimeout}</argument> + <argument>-beam_location=${beamRootProjectDir}</argument> + <argument>-beam_prebuilt=true</argument> + <argument>-beam_sdk=java</argument> + <!-- runner overrides, controlled via forceDirectRunner --> + <argument>${pkbBeamRunnerProfile}</argument> + <argument>${pkbBeamRunnerOption}</argument> + <!-- specific to this IO --> + <argument>-beam_it_module=sdks/java/io/hcatalog</argument> + <argument>-beam_it_class=${ioItClass}</argument> + <!-- arguments typically defined by user --> + <argument>-beam_it_options=${integrationTestPipelineOptions}</argument> + <!-- + optional array of key=value items. It will be passed to + target mvn command by pkb. eg. -DpkbExtraProperties='["filesystem=local"]' + --> + <argument>-beam_extra_mvn_properties=${pkbExtraProperties}</argument> + </arguments> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>dataflow-runner</id> + <activation> + <property> + <name>integrationTestRunner</name> + <value>dataflow</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + </profiles> + <build> <plugins> <plugin> @@ -164,6 +282,17 @@ <scope>test</scope> </dependency> + <!-- It is not used but required for IOIT test. + Check HiveDatabaseTestHelper for usage as driver for jdbc:hive2://url . + Test uses org.apache.hadoop.hive.jdbc.HiveDriver. + --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> @@ -175,10 +304,24 @@ <artifactId>hamcrest-core</artifactId> <scope>test</scope> </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOIT.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOIT.java new file mode 100644 index 00000000000..845bbc1dc2f --- /dev/null +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount; +import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.buildHCatRecords; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.io.hcatalog.HCatalogIO} on an independent Hive/HCatalog + * instance. + * + * <p>This test requires a running instance of Hadoop, Hive and HCatalog. Pass in connection + * information using PipelineOptions: + * + * <pre> + * mvn -e -Pio-it verify -pl sdks/java/io/hcatalog -DintegrationTestPipelineOptions='[ + * "--HCatalogMetastoreHostName=hcatalog-metastore", + * "--HCatalogMetastorePort=9083", + * "--HCatalogHivePort=10000", + * "--HCatalogHiveDatabaseName=default", + * "--HCatalogHiveUsername=user", + * "--HCatalogHivePassword=password", + * "--numberOfRecords=1000" ]' + * </pre> + * + * <p>If you want to run this with a runner besides directrunner, there are profiles for dataflow + * and spark in the hcatalog pom. You'll want to activate those in addition to the normal test + * runner invocation pipeline options. + */ +@RunWith(JUnit4.class) +public class HCatalogIOIT { + + private interface HCatalogPipelineOptions extends IOTestPipelineOptions { + @Description("HCatalog metastore host (hostname/ip address)") + @Default.String("hcatalog-metastore") + String getHCatalogMetastoreHostName(); + + void setHCatalogMetastoreHostName(String host); + + @Description("HCatalog metastore port") + @Default.Integer(9083) + Integer getHCatalogMetastorePort(); + + void setHCatalogMetastorePort(Integer port); + + @Description("HCatalog hive port") + @Default.Integer(10000) + Integer getHCatalogHivePort(); + + void setHCatalogHivePort(Integer port); + + @Description("HCatalog hive database") + @Default.String("default") + String getHCatalogHiveDatabaseName(); + + void setHCatalogHiveDatabaseName(String databaseName); + + @Description("HCatalog hive username") + @Default.String("") + String getHCatalogHiveUsername(); + + void setHCatalogHiveUsername(String username); + + @Description("HCatalog hive password") + @Default.String("") + String getHCatalogHivePassword(); + + void setHCatalogHivePassword(String password); + } + + private static final Map<Integer, String> EXPECTED_HASHES = + ImmutableMap.of( + 100, "34c19971bd34cc1ed6218b84d0db3018", + 1000, "2db7f961724848ffcea299075c166ae8", + 10_000, "7885cdda3ed927e17f7db330adcbebcc"); + + private static HiveDatabaseTestHelper helper; + private static Map<String, String> configProperties; + + private static final String testIdentifier = "HCatalogIOIT"; + private static HCatalogPipelineOptions options; + private static String tableName; + + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); + + @BeforeClass + public static void setup() throws Exception { + PipelineOptionsFactory.register(HCatalogPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(HCatalogPipelineOptions.class); + + final String metastoreUri = + String.format( + "thrift://%s:%s", + options.getHCatalogMetastoreHostName(), options.getHCatalogMetastorePort()); + configProperties = ImmutableMap.of("hive.metastore.uris", metastoreUri); + + helper = + new HiveDatabaseTestHelper( + options.getHCatalogMetastoreHostName(), + options.getHCatalogHivePort(), + options.getHCatalogHiveDatabaseName(), + options.getHCatalogHiveUsername(), + options.getHCatalogHivePassword()); + + try { + tableName = helper.createHiveTable(testIdentifier); + } catch (Exception e) { + helper.closeConnection(); + throw new Exception("Problem with creating table for " + testIdentifier + ": " + e, e); + } + } + + @After + public void tearDown() throws Exception { + try { + helper.dropHiveTable(tableName); + } catch (Exception e) { + helper.closeConnection(); + throw new Exception("Problem with deleting table " + tableName + ": " + e, e); + } finally { + helper.closeConnection(); + } + } + + @Test + public void writeAndReadAll() { + pipelineWrite + .apply("Generate sequence", Create.of(buildHCatRecords(options.getNumberOfRecords()))) + .apply( + HCatalogIO.write() + .withConfigProperties(configProperties) + .withDatabase(options.getHCatalogHiveDatabaseName()) + .withTable(tableName)); + pipelineWrite.run().waitUntilFinish(); + + PCollection<String> testRecords = + pipelineRead + .apply( + HCatalogIO.read() + .withConfigProperties(configProperties) + .withDatabase(options.getHCatalogHiveDatabaseName()) + .withTable(tableName)) + .apply(ParDo.of(new CreateHCatFn())); + + PCollection<String> consolidatedHashcode = + testRecords.apply("Calculate hashcode", Combine.globally(new HashingFn())); + + String expectedHash = getHashForRecordCount(options.getNumberOfRecords(), EXPECTED_HASHES); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + + pipelineRead.run().waitUntilFinish(); + } + + /** Outputs value stored in the HCatRecord. */ + private static class CreateHCatFn extends DoFn<HCatRecord, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().get(0).toString()); + } + } +} diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index 76e19b55e48..cc57e079dd9 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -21,9 +21,9 @@ import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_FILTER; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE; +import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.buildHCatRecords; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords; -import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getReaderContext; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.insertTestData; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -128,7 +128,7 @@ public static void shutdownEmbeddedMetastoreService () throws Exception { @NeedsEmptyTestTables public void testWriteThenReadSuccess() throws Exception { defaultPipeline - .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT))) + .apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT))) .apply( HCatalogIO.write() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) @@ -164,7 +164,7 @@ public void testWriteFailureTableDoesNotExist() throws Exception { thrown.expectMessage(containsString("org.apache.hive.hcatalog.common.HCatException")); thrown.expectMessage(containsString("NoSuchObjectException")); defaultPipeline - .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT))) + .apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT))) .apply( HCatalogIO.write() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java index 12141b2e929..ba2b0c27f0a 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java @@ -34,7 +34,7 @@ import org.apache.hive.hcatalog.data.transfer.WriteEntity; import org.apache.hive.hcatalog.data.transfer.WriterContext; -/** Utility class for HCatalogIOTest. */ +/** Utility class for HCatalogIOTest and HCatalogIOIT. */ class HCatalogIOTestUtils { static final String TEST_DATABASE = "default"; static final String TEST_TABLE = "mytable"; @@ -58,7 +58,8 @@ private static WriterContext getWriterContext(Map<String, String> config) throws /** Writes records to the table using the passed WriterContext. */ private static void writeRecords(WriterContext context) throws HCatException { - DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator()); + DataTransferFactory.getHCatWriter(context) + .write(buildHCatRecords(TEST_RECORDS_COUNT).iterator()); } /** Commits the pending writes to the database. */ @@ -77,7 +78,7 @@ private static void commitRecords(Map<String, String> config, WriterContext cont } /** Returns a list of HCatRecords of passed size. */ - static List<HCatRecord> getHCatRecords(int size) { + static List<HCatRecord> buildHCatRecords(int size) { List<HCatRecord> expected = new ArrayList<>(); for (int i = 0; i < size; i++) { expected.add(toHCatRecord(i)); diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java new file mode 100644 index 00000000000..653f6e21a43 --- /dev/null +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +/** Helper for creating connection and test tables on hive database via JDBC driver. */ + +class HiveDatabaseTestHelper { + private static Connection con; + private static Statement stmt; + + HiveDatabaseTestHelper( + String hiveHost, + Integer hivePort, + String hiveDatabase, + String hiveUsername, + String hivePassword) + throws Exception { + String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost, hivePort, hiveDatabase); + con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword); + stmt = con.createStatement(); + } + + /** Create hive table. */ + String createHiveTable(String testIdentifier) throws Exception { + String tableName = DatabaseTestHelper.getTestTableName(testIdentifier); + stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)"); + return tableName; + } + + /** Delete hive table. */ + void dropHiveTable(String tableName) throws SQLException { + stmt.execute(" DROP TABLE " + tableName); + } + + void closeConnection() throws Exception { + stmt.close(); + con.close(); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 96135) Time Spent: 4h 50m (was: 4h 40m) > implement HCatalog integration test > ----------------------------------- > > Key: BEAM-3997 > URL: https://issues.apache.org/jira/browse/BEAM-3997 > Project: Beam > Issue Type: Test > Components: testing > Reporter: Dariusz Aniszewski > Assignee: Jason Kuster > Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)