kileys commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r833581164
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -238,17 +238,26 @@ public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
+
options.setStreaming(true);
- ExampleUtils exampleUtils = new ExampleUtils(options);
+ runGameStats(options);
+ }
+
+ static void runGameStats(Options options) throws IOException {
+
Pipeline pipeline = Pipeline.create(options);
+ // Using ExampleUtils to set up BigQuery resource.
+ ExampleUtils exampleUtils = new ExampleUtils(options);
Review comment:
The comment specifies that the dataset must exist. We shouldn't need to
do any setup of the BQ table here
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
##########
@@ -170,7 +176,7 @@ public static void main(String[] args) throws Exception {
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
.apply(
- "WriteTeamScoreSums", new WriteToText<>(options.getOutput(),
configureOutput(), true));
+ "WriteTeamScoreSums", new WriteToText<>(options.getOutput(),
configureOutput(), false));
Review comment:
Why are we changing the windowing?
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
##########
@@ -124,6 +137,16 @@
String getLeaderBoardTableName();
void setLeaderBoardTableName(String value);
+
+ @Description("Path to the data file(s) containing game data.")
+ /* The default maps to a small Google Cloud Storage file (each ~8MB)
+
+ Note: You may want to use a small sample dataset to test it
locally/quickly : gs://apache-beam-samples/game/small/gaming_data.csv
+ You can also download it via the command line gsutil cp
gs://apache-beam-samples/game/small/gaming_data.csv
./destination_folder/gaming_data.csv */
+ @Default.String("gs://apache-beam-samples/game/small/gaming_data.csv")
+ String getInput();
+
+ void setInput(String value);
Review comment:
This example reads from Pub/Sub. We shouldn't need an input option here.
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static
org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+ public static final String GAMING_DATA_CSV =
Review comment:
Let's use the default input option
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static
org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+ public static final String GAMING_DATA_CSV =
+ "gs://apache-beam-samples/game/small/gaming_data.csv";
+ public static final String TEMP_STORAGE_DIR =
"gs://temp-storage-for-end-to-end-tests";
+ private static final String DEFAULT_OUTPUT_CHECKSUM =
"f920742fd1b363d01b0a5a44c951c683ea348a47";
+ private HourlyTeamScoreOptions options =
+ TestPipeline.testingPipelineOptions().as(HourlyTeamScoreOptions.class);
+ private static String projectId;
+
+ public interface HourlyTeamScoreOptions extends TestPipelineOptions,
HourlyTeamScore.Options {}
+
+ @Before
+ public void setupTestEnvironment() throws Exception {
+
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ projectId =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+ setupPipelineOptions();
+ }
+
+ @Test
+ public void testE2EHourlyTeamScore() throws Exception {
+
+ HourlyTeamScore.runHourlyTeamScore(options);
+
+ assertThat(
+ new NumberedShardedFile(options.getOutput() + "*-of-*"),
+ fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
+ }
+
+ private void setupPipelineOptions() {
+ options.as(GcpOptions.class).setProject(projectId);
+ options.setBlockOnRun(false);
+ options.setInput(GAMING_DATA_CSV);
+ options.setOutput(
+ FileSystems.matchNewResource(TEMP_STORAGE_DIR, true)
+ .resolve(
+ String.format("hourlyteamscore-it-%tF-%<tH-%<tM-%<tS-%<tL",
new Date()),
Review comment:
Nit:
```suggestion
String.format("HourlyTeamScoreIT-%tF-%<tH-%<tM-%<tS-%<tL",
new Date()),
```
##########
File path: examples/java/build.gradle
##########
@@ -91,10 +91,18 @@ dependencies {
implementation "org.apache.commons:commons-lang3:3.9"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
implementation "org.apache.httpcomponents:httpcore:4.4.13"
+ implementation project(path: ":runners:flink:1.11")
Review comment:
Why do the examples need a Flink dependency?
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
##########
@@ -115,18 +115,27 @@
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- ExampleUtils exampleUtils = new ExampleUtils(options);
+ runStatefulTeamScore(options);
+ }
+
+ static void runStatefulTeamScore(Options options) throws IOException {
+
Pipeline pipeline = Pipeline.create(options);
+ // Using ExampleUtils to set up BigQuery resource.
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setupBigQueryTable();
Review comment:
Same question about the table here
##########
File path: runners/flink/flink_runner.gradle
##########
@@ -314,6 +314,8 @@ tasks.register("examplesIntegrationTest", Test) {
excludeTestsMatching
'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
// TODO (BEAM-14019) Fix integration Tests to run with FlinkRunner:
Error deleting table, Not found: Dataset
excludeTestsMatching
'org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2eBigQueryTornadoesWithStorageApiUsingQuery'
+ // TODO Fix GameStats Example
Review comment:
Can you leave more info about what's failing?
##########
File path: runners/direct-java/build.gradle
##########
@@ -218,6 +218,8 @@ task examplesIntegrationTest(type: Test) {
// TODO (BEAM-14019) Fix integration Tests to run with DirectRunner:
Timeout error
excludeTestsMatching 'org.apache.beam.examples.complete.TfIdfIT'
excludeTestsMatching
'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
+ // TODO Fix GameStats Example
+ excludeTestsMatching 'org.apache.beam.examples.complete.game.GameStatsIT'
Review comment:
Can you leave more info about what's failing?
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
##########
@@ -195,17 +218,25 @@ public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- ExampleUtils exampleUtils = new ExampleUtils(options);
+ runLeaderBoard(options);
+ }
+
+ static void runLeaderBoard(Options options) throws IOException {
+
Pipeline pipeline = Pipeline.create(options);
+ // Using ExampleUtils to set up BigQuery resource.
+ ExampleUtils exampleUtils = new ExampleUtils(options);
Review comment:
Same comment as before
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+ public static final String GAMING_DATA_CSV =
+ "gs://apache-beam-samples/game/small/gaming_data.csv";
Review comment:
Yes, that works
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
ExampleUtils exampleUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
+ // Run the pipeline and wait for the pipeline to finish; capture
cancellation requests from the
+ // command line.
+ PipelineResult result = runGameStats(options, pipeline);
+ exampleUtils.waitToFinish(result);
+ }
+
+ static PipelineResult runGameStats(Options options, Pipeline pipeline) {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents =
pipeline
.apply(
PubsubIO.readStrings()
- .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
- .fromTopic(options.getTopic()))
+ .fromSubscription(options.getSubscription())
Review comment:
Can we just delete the topic in the cleanup of the integration test? It
looks like subscriptions get deleted if it's inactive. I see this in some
subscriptions in the project.
Subscription expiration
Subscription expires in 31 days if there is no activity.
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+ private static final DateTimeFormatter DATETIME_FORMAT =
+ DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+ private static final String EVENTS_TOPIC_NAME = "events";
+ public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+ private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+ public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+ "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+ private GameStatsOptions options =
+
TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+ @Rule public final transient TestPipeline testPipeline =
TestPipeline.fromOptions(options);
+ private static String pubsubEndpoint;
+ private @Nullable ManagedChannel channel = null;
+ private @Nullable TransportChannelProvider channelProvider = null;
+ private @Nullable TopicAdminClient topicAdmin = null;
+ private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+ private @Nullable TopicPath eventsTopicPath = null;
+ private @Nullable SubscriptionPath subscriptionPath = null;
+ private String projectId;
+ private static final String TOPIC_PREFIX = "gamestats-";
+ private BigqueryClient bqClient;
+ private final String OUTPUT_DATASET = "game_stats_e2e";
+
+ public interface GameStatsOptions extends TestPipelineOptions,
GameStats.Options {};
+
+ @Before
+ public void setupTestEnvironment() throws Exception {
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ projectId =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+ setupBigQuery();
Review comment:
I meant instead of writing your own setupPubSub and setupBigQuery, you
can use the setup in the ExamplesUtils that does exactly that. It also has a
method waitToFinish that'll tear down pubsub and BQ resources after the
pipeline finishes running
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static
org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+ public static final String GAMING_DATA_CSV =
+ "gs://apache-beam-samples/game/small/gaming_data.csv";
+ public static final String TEMP_STORAGE_DIR =
"gs://temp-storage-for-end-to-end-tests";
+ private static final String DEFAULT_OUTPUT_CHECKSUM =
"f920742fd1b363d01b0a5a44c951c683ea348a47";
+ private HourlyTeamScoreOptions options =
+ TestPipeline.testingPipelineOptions().as(HourlyTeamScoreOptions.class);
+ private static String projectId;
+
+ public interface HourlyTeamScoreOptions extends TestPipelineOptions,
HourlyTeamScore.Options {}
+
+ @Before
+ public void setupTestEnvironment() throws Exception {
+
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ projectId =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+ setupPipelineOptions();
+ }
+
+ @Test
+ public void testE2EHourlyTeamScore() throws Exception {
+
+ HourlyTeamScore.runHourlyTeamScore(options);
+
+ assertThat(
+ new NumberedShardedFile(options.getOutput() + "*-of-*"),
+ fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
+ }
+
+ private void setupPipelineOptions() {
+ options.as(GcpOptions.class).setProject(projectId);
+ options.setBlockOnRun(false);
+ options.setInput(GAMING_DATA_CSV);
+ options.setOutput(
+ FileSystems.matchNewResource(TEMP_STORAGE_DIR, true)
Review comment:
Should be able to use temp root
```suggestion
FileSystems.matchNewResource(options.getTempRoot(), true)
```
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+ private static final DateTimeFormatter DATETIME_FORMAT =
+ DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+ private static final String EVENTS_TOPIC_NAME = "events";
+ public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+ private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+ public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+ "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+ private GameStatsOptions options =
+
TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+ @Rule public final transient TestPipeline testPipeline =
TestPipeline.fromOptions(options);
+ private static String pubsubEndpoint;
+ private @Nullable ManagedChannel channel = null;
+ private @Nullable TransportChannelProvider channelProvider = null;
+ private @Nullable TopicAdminClient topicAdmin = null;
+ private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+ private @Nullable TopicPath eventsTopicPath = null;
+ private @Nullable SubscriptionPath subscriptionPath = null;
+ private String projectId;
+ private static final String TOPIC_PREFIX = "gamestats-";
+ private BigqueryClient bqClient;
+ private final String OUTPUT_DATASET = "game_stats_e2e";
+
+ public interface GameStatsOptions extends TestPipelineOptions,
GameStats.Options {};
+
+ @Before
+ public void setupTestEnvironment() throws Exception {
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ projectId =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+ setupBigQuery();
Review comment:
You can specify the names through the options
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]