[
https://issues.apache.org/jira/browse/BEAM-4808?focusedWorklogId=131082&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131082
]
ASF GitHub Bot logged work on BEAM-4808:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Aug/18 21:37
Start Date: 03/Aug/18 21:37
Worklog Time Spent: 10m
Work Description: XuMingmin closed pull request #6006: [BEAM-4808][SQL]
add e2e test for BeamSqlLine.
URL: https://github.com/apache/beam/pull/6006
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index d98ce9b3d71..dcfadbbc414 100644
--- a/build.gradle
+++ b/build.gradle
@@ -191,6 +191,7 @@ task javaPostCommit() {
dependsOn ":beam-runners-google-cloud-dataflow-java:postCommit"
dependsOn ":beam-sdks-java-io-google-cloud-platform:postCommit"
dependsOn ":beam-sdks-java-extensions-sql:postCommit"
+ dependsOn ":beam-sdks-java-extensions-sql-jdbc:postCommit"
}
task goPreCommit() {
diff --git a/sdks/java/extensions/sql/jdbc/build.gradle
b/sdks/java/extensions/sql/jdbc/build.gradle
index 46c90e0c3c2..d0a2379bba6 100644
--- a/sdks/java/extensions/sql/jdbc/build.gradle
+++ b/sdks/java/extensions/sql/jdbc/build.gradle
@@ -1,3 +1,5 @@
+import groovy.json.JsonOutput
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -29,6 +31,7 @@ dependencies {
compile "sqlline:sqlline:1.4.0"
compile library.java.slf4j_jdk14
compile library.java.guava
+ testCompile project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadowTest")
testCompile library.java.junit
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
@@ -71,7 +74,36 @@ task shadowJarTest(type: Test, dependsOn:
":beam-sdks-java-extensions-sql-jdbc:s
useJUnit { }
}
+task endToEndTest(type: Test) {
+ group = "Verification"
+
+ def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+ def gcsTempRoot = project.findProperty('gcsTempRoot') ?:
'gs://temp-storage-for-end-to-end-tests/'
+
+ // Disable Gradle cache (it should not be used because the IT's won't run).
+ outputs.upToDateWhen { false }
+
+ def pipelineOptions = [
+ "--project=${gcpProject}",
+ "--tempLocation=${gcsTempRoot}",
+ "--blockOnRun=false"]
+
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
+
+ include '**/*IT.class'
+ classpath =
project(":beam-sdks-java-extensions-sql-jdbc").sourceSets.test.runtimeClasspath
+ testClassesDirs =
files(project(":beam-sdks-java-extensions-sql-jdbc").sourceSets.test.output.classesDirs)
+ useJUnit { }
+}
+
/* Define a common precommit task which depends on all the individual
precommits. */
task preCommit() {
dependsOn "shadowJarTest"
}
+
+/* Define a postcommit task which depends on integration tests. */
+task postCommit {
+ group = "Verification"
+ description = "Various integration tests"
+ dependsOn endToEndTest
+}
diff --git
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.java
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.java
new file mode 100644
index 00000000000..7d83a6de0da
--- /dev/null
+++
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.jdbc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs;
+import static
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.toLines;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
+import org.hamcrest.collection.IsIn;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** BeamSqlLine integration tests. */
+public class BeamSqlLineIT implements Serializable {
+
+ @Rule public transient TestPubsub eventsTopic = TestPubsub.create();
+
+ private static String project;
+ private static String createPubsubTableStatement;
+ private static String setProject;
+ private static final SimpleDateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ private ExecutorService pool;
+
+ @BeforeClass
+ public static void setUpClass() {
+ project =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+ setProject = String.format("SET project = '%s';", project);
+
+ createPubsubTableStatement =
+ "CREATE TABLE taxi_rides (\n"
+ + " event_timestamp TIMESTAMP,\n"
+ + " attributes MAP<VARCHAR, VARCHAR>,\n"
+ + " payload ROW<\n"
+ + " ride_id VARCHAR,\n"
+ + " point_idx INT,\n"
+ + " latitude DOUBLE,\n"
+ + " longitude DOUBLE,\n"
+ + " meter_reading DOUBLE,\n"
+ + " meter_increment DOUBLE,\n"
+ + " ride_status VARCHAR,\n"
+ + " passenger_count TINYINT>)\n"
+ + " TYPE pubsub \n"
+ + " LOCATION '%s'\n"
+ + " TBLPROPERTIES '{\"timestampAttributeKey\": \"ts\"}';";
+
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ @Before
+ public void setUp() {
+ pool = Executors.newFixedThreadPool(1);
+ }
+
+ @After
+ public void tearDown() {
+ pool.shutdown();
+ }
+
+ @Test
+ public void testSelectFromPubsub() throws Exception {
+ String[] args =
+ buildArgs(
+ String.format(createPubsubTableStatement, eventsTopic.topicPath()),
+ setProject,
+ "SELECT event_timestamp, taxi_rides.payload.ride_status,
taxi_rides.payload.latitude, "
+ + "taxi_rides.payload.longitude from taxi_rides LIMIT 3;");
+
+ Future<List<List<String>>> expectedResult = runQueryInBackground(args);
+ eventsTopic.checkIfAnySubscriptionExists(project,
Duration.standardMinutes(1));
+
+ List<PubsubMessage> messages =
+ ImmutableList.of(
+ message(
+ convertTimestampToMillis("2018-07-01 21:25:20"),
+ taxiRideJSON("id1", 1, 40.702, -74.001, 1000, 10, "enroute",
2)),
+ message(
+ convertTimestampToMillis("2018-07-01 21:26:06"),
+ taxiRideJSON("id2", 2, 40.703, -74.002, 1000, 10, "enroute",
4)),
+ message(
+ convertTimestampToMillis("2018-07-02 13:26:06"),
+ taxiRideJSON("id3", 3, 30.0, -72.32324, 2000, 20, "enroute",
7)));
+
+ eventsTopic.publish(messages);
+
+ assertThat(
+ Arrays.asList(
+ Arrays.asList("2018-07-01 21:25:20", "enroute", "40.702",
"-74.001"),
+ Arrays.asList("2018-07-01 21:26:06", "enroute", "40.703",
"-74.002"),
+ Arrays.asList("2018-07-02 13:26:06", "enroute", "30.0",
"-72.32324")),
+ everyItem(IsIn.isOneOf(expectedResult.get(30,
TimeUnit.SECONDS).toArray())));
+ }
+
+ @Test
+ public void testFilterForSouthManhattan() throws Exception {
+ String[] args =
+ buildArgs(
+ String.format(createPubsubTableStatement, eventsTopic.topicPath()),
+ setProject,
+ "SELECT event_timestamp, taxi_rides.payload.ride_status, \n"
+ + "taxi_rides.payload.latitude, taxi_rides.payload.longitude
from taxi_rides\n"
+ + " WHERE taxi_rides.payload.longitude > -74.747\n"
+ + " AND taxi_rides.payload.longitude < -73.969\n"
+ + " AND taxi_rides.payload.latitude > 40.699\n"
+ + " AND taxi_rides.payload.latitude < 40.720 LIMIT
2;");
+
+ Future<List<List<String>>> expectedResult = runQueryInBackground(args);
+ eventsTopic.checkIfAnySubscriptionExists(project,
Duration.standardMinutes(1));
+
+ List<PubsubMessage> messages =
+ ImmutableList.of(
+ message(
+ convertTimestampToMillis("2018-07-01 21:25:20"),
+ taxiRideJSON("id1", 1, 40.701, -74.001, 1000, 10, "enroute",
2)),
+ message(
+ convertTimestampToMillis("2018-07-01 21:26:06"),
+ taxiRideJSON("id2", 2, 40.702, -74.002, 1000, 10, "enroute",
4)),
+ message(
+ convertTimestampToMillis("2018-07-02 13:26:06"),
+ taxiRideJSON("id3", 3, 30, -72.32324, 2000, 20, "enroute", 7)),
+ message(
+ convertTimestampToMillis("2018-07-02 14:28:22"),
+ taxiRideJSON("id4", 4, 34, -73.32324, 2000, 20, "enroute",
8)));
+
+ eventsTopic.publish(messages);
+
+ assertThat(
+ Arrays.asList(
+ Arrays.asList("2018-07-01 21:25:20", "enroute", "40.701",
"-74.001"),
+ Arrays.asList("2018-07-01 21:26:06", "enroute", "40.702",
"-74.002")),
+ everyItem(IsIn.isOneOf(expectedResult.get(30,
TimeUnit.SECONDS).toArray())));
+ }
+
+ private String taxiRideJSON(
+ String rideId,
+ int pointIdex,
+ double latitude,
+ double longitude,
+ int meterReading,
+ int meterIncrement,
+ String rideStatus,
+ int passengerCount) {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode objectNode = mapper.createObjectNode();
+ objectNode.put("ride_id", rideId);
+ objectNode.put("point_idx", pointIdex);
+ objectNode.put("latitude", latitude);
+ objectNode.put("longitude", longitude);
+ objectNode.put("meter_reading", meterReading);
+ objectNode.put("meter_increment", meterIncrement);
+ objectNode.put("ride_status", rideStatus);
+ objectNode.put("passenger_count", passengerCount);
+ return objectNode.toString();
+ }
+
+ private Future<List<List<String>>> runQueryInBackground(String[] args) {
+ return pool.submit(
+ (Callable)
+ () -> {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BeamSqlLine.runSqlLine(args, null, outputStream, null);
+ return toLines(outputStream);
+ });
+ }
+
+ private long convertTimestampToMillis(String timestamp) throws
ParseException {
+ return dateFormat.parse(timestamp).getTime();
+ }
+
+ private PubsubMessage message(long timestampInMillis, String jsonPayload) {
+ return new PubsubMessage(
+ jsonPayload.getBytes(UTF_8), ImmutableMap.of("ts",
String.valueOf(timestampInMillis)));
+ }
+}
diff --git
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTest.java
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTest.java
index f8ec21ce735..d14bc863efa 100644
---
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTest.java
+++
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.extensions.sql.jdbc;
-import static java.util.stream.Collectors.toList;
+import static
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs;
+import static
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.toLines;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.junit.Assert.assertThat;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.hamcrest.collection.IsIn;
@@ -36,7 +36,6 @@
* tests for crashes (due to ClassNotFoundException for example). It does not
test output.
*/
public class BeamSqlLineTest {
- private static final String QUERY_ARG = "-e";
@Rule public TemporaryFolder folder = new TemporaryFolder();
@@ -198,25 +197,4 @@ public void testSqlLine_slidingWindow() throws Exception {
Arrays.asList("2018-07-01 21:26:11", "1")),
everyItem(IsIn.isOneOf(lines.toArray())));
}
-
- private String[] buildArgs(String... strs) {
- List<String> argsList = new ArrayList();
- for (String str : strs) {
- argsList.add(QUERY_ARG);
- argsList.add(str);
- }
- return argsList.toArray(new String[argsList.size()]);
- }
-
- private List<List<String>> toLines(ByteArrayOutputStream outputStream) {
- List<String> outputLines =
Arrays.asList(outputStream.toString().split("\n"));
- return
outputLines.stream().map(BeamSqlLineTest::splitFields).collect(toList());
- }
-
- private static List<String> splitFields(String outputLine) {
- return Arrays.stream(outputLine.split("\\|"))
- .map(field -> field.trim())
- .filter(field -> field.length() != 0)
- .collect(toList());
- }
}
diff --git
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTestingUtils.java
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTestingUtils.java
new file mode 100644
index 00000000000..09cfb25b6c6
--- /dev/null
+++
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineTestingUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.jdbc;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** Util functions for BeamSqlLine related tests. */
+class BeamSqlLineTestingUtils {
+
+ private static final String QUERY_ARG = "-e";
+
+ public static String[] buildArgs(String... strs) {
+ List<String> argsList = new ArrayList();
+ for (String str : strs) {
+ argsList.add(QUERY_ARG);
+ argsList.add(str);
+ }
+ return argsList.toArray(new String[argsList.size()]);
+ }
+
+ public static List<List<String>> toLines(ByteArrayOutputStream outputStream)
{
+ List<String> outputLines =
Arrays.asList(outputStream.toString().split("\n"));
+ return
outputLines.stream().map(BeamSqlLineTestingUtils::splitFields).collect(toList());
+ }
+
+ private static List<String> splitFields(String outputLine) {
+ return Arrays.stream(outputLine.split("\\|"))
+ .map(field -> field.trim())
+ .filter(field -> field.length() != 0)
+ .collect(toList());
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index f4d05f15c04..a31c85d4a96 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -18,16 +18,22 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import static java.util.stream.Collectors.toList;
+import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.projectPathFromPath;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.rules.TestRule;
@@ -154,6 +160,11 @@ public TopicPath topicPath() {
return eventsTopicPath;
}
+ private List<SubscriptionPath> listSubscriptions(ProjectPath projectPath,
TopicPath topicPath)
+ throws IOException {
+ return pubsub.listSubscriptions(projectPath, topicPath);
+ }
+
/** Publish messages to {@link #topicPath()}. */
public void publish(List<PubsubMessage> messages) throws IOException {
List<PubsubClient.OutgoingMessage> outgoingMessages =
@@ -161,6 +172,37 @@ public void publish(List<PubsubMessage> messages) throws
IOException {
pubsub.publish(eventsTopicPath, outgoingMessages);
}
+ /**
+ * Check if topics exist.
+ *
+ * @param project GCP project identifier.
+ * @param timeoutDuration Joda duration that sets a period of time before
checking times out.
+ */
+ public void checkIfAnySubscriptionExists(String project, Duration
timeoutDuration)
+ throws InterruptedException, IllegalArgumentException, IOException,
TimeoutException {
+ if (timeoutDuration.getMillis() <= 0) {
+ throw new IllegalArgumentException(String.format("timeoutDuration should
be greater than 0"));
+ }
+
+ DateTime startTime = new DateTime();
+ int sizeOfSubscriptionList = 0;
+ while (sizeOfSubscriptionList == 0
+ && Seconds.secondsBetween(new DateTime(), startTime).getSeconds()
+ < timeoutDuration.toStandardSeconds().getSeconds()) {
+ // Sleep 1 sec
+ Thread.sleep(1000);
+ sizeOfSubscriptionList =
+ listSubscriptions(projectPathFromPath(String.format("projects/%s",
project)), topicPath())
+ .size();
+ }
+
+ if (sizeOfSubscriptionList > 0) {
+ return;
+ } else {
+ throw new TimeoutException("Timed out when checking if topics exist for
" + topicPath());
+ }
+ }
+
private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage
message) {
return new PubsubClient.OutgoingMessage(
message.getPayload(), message.getAttributeMap(),
DateTime.now().getMillis(), null);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 131082)
Time Spent: 19h (was: 18h 50m)
> Add an integration test for BeamSqlLine
> ---------------------------------------
>
> Key: BEAM-4808
> URL: https://issues.apache.org/jira/browse/BEAM-4808
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Time Spent: 19h
> Remaining Estimate: 0h
>
> Test non group by window queries in Beam SQL Shell tutorial.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)