Change Nexmark pom structure to mirror other modules on Beam Fix compile after PubsubIO refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8098bb1d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8098bb1d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8098bb1d Branch: refs/heads/master Commit: 8098bb1dbcc22153960d9b4483327e2977641148 Parents: 7ef49dc Author: Ismaël MejÃa <[email protected]> Authored: Thu Apr 13 10:47:54 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 48 +- .../beam/integration/nexmark/NexmarkRunner.java | 2 +- .../integration/nexmark/io/PubsubClient.java | 543 +++++++++++++++++++ .../integration/nexmark/io/PubsubHelper.java | 2 - .../nexmark/io/PubsubJsonClient.java | 318 +++++++++++ .../nexmark/io/PubsubTestClient.java | 436 +++++++++++++++ integration/java/pom.xml | 37 ++ integration/pom.xml | 37 ++ pom.xml | 2 +- 9 files changed, 1401 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 7cd7d39..67d6117 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -22,19 +22,17 @@ <parent> <groupId>org.apache.beam</groupId> - <artifactId>beam-parent</artifactId> + <artifactId>beam-integration-java-parent</artifactId> <version>0.7.0-SNAPSHOT</version> - <relativePath>../../../pom.xml</relativePath> + <relativePath>../pom.xml</relativePath> </parent> - <artifactId>beam-integration-java</artifactId> + <artifactId>beam-integration-java-nexmark</artifactId> <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name> <packaging>jar</packaging> <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <flink.version>1.2.0</flink.version> <spark.version>1.6.3</spark.version> <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version> @@ -253,11 +251,36 @@ </dependency> <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-pubsub</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-credentials</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-oauth2-http</artifactId> + </dependency> + + <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcsio</artifactId> </dependency> <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util</artifactId> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + </dependency> + + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> @@ -288,13 +311,6 @@ <scope>compile</scope> </dependency> - <!--<dependency>--> - <!--<groupId>org.slf4j</groupId>--> - <!--<artifactId>slf4j-jdk14</artifactId>--> - <!--<!– When loaded at runtime this will wire up slf4j to the JUL backend –>--> - <!--<scope>runtime</scope>--> - <!--</dependency>--> - <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> @@ -305,13 +321,5 @@ <artifactId>junit</artifactId> <scope>compile</scope> </dependency> - - <!--<dependency>--> - <!--<groupId>io.netty</groupId>--> - <!--<artifactId>netty-tcnative-boringssl-static</artifactId>--> - <!--<version>1.1.33.Fork13</version>--> - <!--<classifier>${os.detected.classifier}</classifier>--> - <!--<scope>runtime</scope>--> - <!--</dependency>--> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index df1000a..3a0452f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -67,9 +67,9 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java new file mode 100644 index 0000000..687aa35 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.DateTime; +import com.google.common.base.Objects; +import com.google.common.base.Strings; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * An (abstract) helper class for talking to Pubsub via an underlying transport. + */ +abstract class PubsubClient implements Closeable { + /** + * Factory for creating clients. + */ + public interface PubsubClientFactory extends Serializable { + /** + * Construct a new Pubsub client. It should be closed via {@link #close} in order + * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources + * construct). Uses {@code options} to derive pubsub endpoints and application credentials. + * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom + * timestamps/ids within message metadata. + */ + PubsubClient newClient(@Nullable String timestampLabel, + @Nullable String idLabel, PubsubOptions options) throws IOException; + + /** + * Return the display name for this factory. Eg "Json", "gRPC". + */ + String getKind(); + } + + /** + * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}. + * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException} + * if timestamp cannot be recognized. + */ + @Nullable + private static Long asMsSinceEpoch(@Nullable String timestamp) { + if (Strings.isNullOrEmpty(timestamp)) { + return null; + } + try { + // Try parsing as milliseconds since epoch. Note there is no way to parse a + // string in RFC 3339 format here. + // Expected IllegalArgumentException if parsing fails; we use that to fall back + // to RFC 3339. + return Long.parseLong(timestamp); + } catch (IllegalArgumentException e1) { + // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an + // IllegalArgumentException if parsing fails, and the caller should handle. + return DateTime.parseRfc3339(timestamp).getValue(); + } + } + + /** + * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code + * attributes} and {@code pubsubTimestamp}. + * + * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain + * that label, and the value of that label will be taken as the timestamp. + * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code + * pubsubTimestamp}. + * + * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch + * or RFC3339 time. + */ + protected static long extractTimestamp( + @Nullable String timestampLabel, + @Nullable String pubsubTimestamp, + @Nullable Map<String, String> attributes) { + Long timestampMsSinceEpoch; + if (Strings.isNullOrEmpty(timestampLabel)) { + timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret PubSub publish timestamp: %s", + pubsubTimestamp); + } else { + String value = attributes == null ? null : attributes.get(timestampLabel); + checkArgument(value != null, + "PubSub message is missing a value for timestamp label %s", + timestampLabel); + timestampMsSinceEpoch = asMsSinceEpoch(value); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret value of label %s as timestamp: %s", + timestampLabel, value); + } + return timestampMsSinceEpoch; + } + + /** + * Path representing a cloud project id. + */ + static class ProjectPath implements Serializable { + private final String projectId; + + /** + * Creates a {@link ProjectPath} from a {@link String} representation, which + * must be of the form {@code "projects/" + projectId}. + */ + ProjectPath(String path) { + String[] splits = path.split("/"); + checkArgument( + splits.length == 2 && splits[0].equals("projects"), + "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>", + path); + this.projectId = splits[1]; + } + + public String getPath() { + return String.format("projects/%s", projectId); + } + + public String getId() { + return projectId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProjectPath that = (ProjectPath) o; + + return projectId.equals(that.projectId); + } + + @Override + public int hashCode() { + return projectId.hashCode(); + } + + @Override + public String toString() { + return getPath(); + } + } + + public static ProjectPath projectPathFromPath(String path) { + return new ProjectPath(path); + } + + public static ProjectPath projectPathFromId(String projectId) { + return new ProjectPath(String.format("projects/%s", projectId)); + } + + /** + * Path representing a Pubsub subscription. + */ + public static class SubscriptionPath implements Serializable { + private final String projectId; + private final String subscriptionName; + + SubscriptionPath(String path) { + String[] splits = path.split("/"); + checkState( + splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"), + "Malformed subscription path %s: " + + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path); + this.projectId = splits[1]; + this.subscriptionName = splits[3]; + } + + public String getPath() { + return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); + } + + public String getName() { + return subscriptionName; + } + + public String getV1Beta1Path() { + return String.format("/subscriptions/%s/%s", projectId, subscriptionName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionPath that = (SubscriptionPath) o; + return this.subscriptionName.equals(that.subscriptionName) + && this.projectId.equals(that.projectId); + } + + @Override + public int hashCode() { + return Objects.hashCode(projectId, subscriptionName); + } + + @Override + public String toString() { + return getPath(); + } + } + + public static SubscriptionPath subscriptionPathFromPath(String path) { + return new SubscriptionPath(path); + } + + public static SubscriptionPath subscriptionPathFromName( + String projectId, String subscriptionName) { + return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", + projectId, subscriptionName)); + } + + /** + * Path representing a Pubsub topic. + */ + public static class TopicPath implements Serializable { + private final String path; + + TopicPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public String getName() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return splits[3]; + } + + public String getV1Beta1Path() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return String.format("/topics/%s/%s", splits[1], splits[3]); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicPath topicPath = (TopicPath) o; + return path.equals(topicPath.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + @Override + public String toString() { + return path; + } + } + + public static TopicPath topicPathFromPath(String path) { + return new TopicPath(path); + } + + public static TopicPath topicPathFromName(String projectId, String topicName) { + return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); + } + + /** + * A message to be sent to Pubsub. + * + * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + static class OutgoingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + public final Map<String, String> attributes; + + /** + * Timestamp for element (ms since epoch). + */ + public final long timestampMsSinceEpoch; + + /** + * If using an id label, the record id to associate with this record's metadata so the receiver + * can reject duplicates. Otherwise {@literal null}. + */ + @Nullable + public final String recordId; + + public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes, + long timestampMsSinceEpoch, @Nullable String recordId) { + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.recordId = recordId; + } + + @Override + public String toString() { + return String.format("OutgoingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutgoingMessage that = (OutgoingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes) + && Objects.equal(recordId, that.recordId); + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, + recordId); + } + } + + /** + * A message received from Pubsub. + * + * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + static class IncomingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + public Map<String, String> attributes; + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, + * or the custom timestamp associated with the message. + */ + public final long timestampMsSinceEpoch; + + /** + * Timestamp (in system time) at which we requested the message (ms since epoch). + */ + public final long requestTimeMsSinceEpoch; + + /** + * Id to pass back to Pubsub to acknowledge receipt of this message. + */ + public final String ackId; + + /** + * Id to pass to the runner to distinguish this message from all others. + */ + public final String recordId; + + public IncomingMessage( + byte[] elementBytes, + Map<String, String> attributes, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + String recordId) { + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackId = ackId; + this.recordId = recordId; + } + + public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { + return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId); + } + + @Override + public String toString() { + return String.format("IncomingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncomingMessage that = (IncomingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch + && ackId.equals(that.ackId) + && recordId.equals(that.recordId) + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes); + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, recordId); + } + } + + /** + * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages + * published. + */ + public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) + throws IOException; + + /** + * Request the next batch of up to {@code batchSize} messages from {@code subscription}. + * Return the received messages, or empty collection if none were available. Does not + * wait for messages to arrive if {@code returnImmediately} is {@literal true}. + * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}. + */ + public abstract List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) + throws IOException; + + /** + * Acknowldege messages from {@code subscription} with {@code ackIds}. + */ + public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds) + throws IOException; + + /** + * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to + * be {@code deadlineSeconds} from now. + */ + public abstract void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, + int deadlineSeconds) throws IOException; + + /** + * Create {@code topic}. + */ + public abstract void createTopic(TopicPath topic) throws IOException; + + /* + * Delete {@code topic}. + */ + public abstract void deleteTopic(TopicPath topic) throws IOException; + + /** + * Return a list of topics for {@code project}. + */ + public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException; + + /** + * Create {@code subscription} to {@code topic}. + */ + public abstract void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; + + /** + * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It + * is the responsibility of the caller to later delete the subscription. + */ + public SubscriptionPath createRandomSubscription( + ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException { + // Create a randomized subscription derived from the topic name. + String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong(); + SubscriptionPath subscription = + PubsubClient + .subscriptionPathFromName(project.getId(), subscriptionName); + createSubscription(topic, subscription, ackDeadlineSeconds); + return subscription; + } + + /** + * Delete {@code subscription}. + */ + public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException; + + /** + * Return a list of subscriptions for {@code topic} in {@code project}. + */ + public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException; + + /** + * Return the ack deadline, in seconds, for {@code subscription}. + */ + public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; + + /** + * Return {@literal true} if {@link #pull} will always return empty list. Actual clients + * will return {@literal false}. Test clients may return {@literal true} to signal that all + * expected messages have been pulled and the test may complete. + */ + public abstract boolean isEOF(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java index f5cfc2b..15401b7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java @@ -24,8 +24,6 @@ import java.util.List; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubJsonClient; /** * Helper for working with pubsub. http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java new file mode 100644 index 0000000..b778a09 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark.io; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.Pubsub.Builder; +import com.google.api.services.pubsub.model.AcknowledgeRequest; +import com.google.api.services.pubsub.model.ListSubscriptionsResponse; +import com.google.api.services.pubsub.model.ListTopicsResponse; +import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Transport; + +/** + * A Pubsub client using JSON transport. + */ +class PubsubJsonClient extends PubsubClient { + + private static class PubsubJsonClientFactory implements PubsubClientFactory { + private static HttpRequestInitializer chainHttpRequestInitializer( + Credentials credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); + } + } + + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + Pubsub pubsub = new Builder( + Transport.getTransport(), + Transport.getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setRootUrl(options.getPubsubRootUrl()) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) + .build(); + return new PubsubJsonClient(timestampLabel, idLabel, pubsub); + } + + @Override + public String getKind() { + return "Json"; + } + } + + /** + * Factory for creating Pubsub clients using Json transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + /** + * Underlying JSON transport. + */ + private Pubsub pubsub; + + @VisibleForTesting PubsubJsonClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + Pubsub pubsub) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.pubsub = pubsub; + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) + throws IOException { + List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); + + Map<String, String> attributes = outgoingMessage.attributes; + if ((timestampLabel != null || idLabel != null) && attributes == null) { + attributes = new TreeMap<>(); + } + if (attributes != null) { + pubsubMessage.setAttributes(attributes); + } + + if (timestampLabel != null) { + attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idLabel, outgoingMessage.recordId); + } + + pubsubMessages.add(pubsubMessage); + } + PublishRequest request = new PublishRequest().setMessages(pubsubMessages); + PublishResponse response = pubsub.projects() + .topics() + .publish(topic.getPath(), request) + .execute(); + return response.getMessageIds().size(); + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) throws IOException { + PullRequest request = new PullRequest() + .setReturnImmediately(returnImmediately) + .setMaxMessages(batchSize); + PullResponse response = pubsub.projects() + .subscriptions() + .pull(subscription.getPath(), request) + .execute(); + if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { + return ImmutableList.of(); + } + List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); + for (ReceivedMessage message : response.getReceivedMessages()) { + PubsubMessage pubsubMessage = message.getMessage(); + @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.decodeData(); + + // Timestamp. + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); + + // Ack id. + String ackId = message.getAckId(); + checkState(!Strings.isNullOrEmpty(ackId)); + + // Record id, if any. + @Nullable String recordId = null; + if (idLabel != null && attributes != null) { + recordId = attributes.get(idLabel); + } + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + + return incomingMessages; + } + + @Override + public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException { + AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); + pubsub.projects() + .subscriptions() + .acknowledge(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + new ModifyAckDeadlineRequest().setAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds); + pubsub.projects() + .subscriptions() + .modifyAckDeadline(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .create(topic.getPath(), new Topic()) + .execute(); // ignore Topic result. + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .delete(topic.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + ListTopicsResponse response = pubsub.projects() + .topics() + .list(project.getPath()) + .execute(); + if (response.getTopics() == null || response.getTopics().isEmpty()) { + return ImmutableList.of(); + } + List<TopicPath> topics = new ArrayList<>(response.getTopics().size()); + for (Topic topic : response.getTopics()) { + topics.add(topicPathFromPath(topic.getName())); + } + return topics; + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { + Subscription request = new Subscription() + .setTopic(topic.getPath()) + .setAckDeadlineSeconds(ackDeadlineSeconds); + pubsub.projects() + .subscriptions() + .create(subscription.getPath(), request) + .execute(); // ignore Subscription result. + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + pubsub.projects() + .subscriptions() + .delete(subscription.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException { + ListSubscriptionsResponse response = pubsub.projects() + .subscriptions() + .list(project.getPath()) + .execute(); + if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { + return ImmutableList.of(); + } + List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size()); + for (Subscription subscription : response.getSubscriptions()) { + if (subscription.getTopic().equals(topic.getPath())) { + subscriptions.add(subscriptionPathFromPath(subscription.getName())); + } + } + return subscriptions; + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); + return response.getAckDeadlineSeconds(); + } + + @Override + public boolean isEOF() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java new file mode 100644 index 0000000..125a8d6 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark.io; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.Clock; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for + * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} + * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. + */ +class PubsubTestClient extends PubsubClient implements Serializable { + /** + * Mimic the state of the simulated Pubsub 'service'. + * + * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running + * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created + * from the same client factory and run in parallel. Thus we can't enforce aliasing of the + * following data structures over all clients and must resort to a static. + */ + private static class State { + /** + * True if has been primed for a test but not yet validated. + */ + boolean isActive; + + /** + * Publish mode only: Only publish calls for this topic are allowed. + */ + @Nullable + TopicPath expectedTopic; + + /** + * Publish mode only: Messages yet to seen in a {@link #publish} call. + */ + @Nullable + Set<OutgoingMessage> remainingExpectedOutgoingMessages; + + /** + * Publish mode only: Messages which should throw when first sent to simulate transient publish + * failure. + */ + @Nullable + Set<OutgoingMessage> remainingFailingOutgoingMessages; + + /** + * Pull mode only: Clock from which to get current time. + */ + @Nullable + Clock clock; + + /** + * Pull mode only: Only pull calls for this subscription are allowed. + */ + @Nullable + SubscriptionPath expectedSubscription; + + /** + * Pull mode only: Timeout to simulate. + */ + int ackTimeoutSec; + + /** + * Pull mode only: Messages waiting to be received by a {@link #pull} call. + */ + @Nullable + List<IncomingMessage> remainingPendingIncomingMessages; + + /** + * Pull mode only: Messages which have been returned from a {@link #pull} call and + * not yet ACKed by an {@link #acknowledge} call. + */ + @Nullable + Map<String, IncomingMessage> pendingAckIncomingMessages; + + /** + * Pull mode only: When above messages are due to have their ACK deadlines expire. + */ + @Nullable + Map<String, Long> ackDeadline; + } + + private static final State STATE = new State(); + + /** Closing the factory will validate all expected messages were processed. */ + public interface PubsubTestClientFactory + extends PubsubClientFactory, Closeable, Serializable { + } + + /** + * Return a factory for testing publishers. Only one factory may be in-flight at a time. + * The factory must be closed when the test is complete, at which point final validation will + * occur. + */ + static PubsubTestClientFactory createFactoryForPublish( + final TopicPath expectedTopic, + final Iterable<OutgoingMessage> expectedOutgoingMessages, + final Iterable<OutgoingMessage> failingOutgoingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.expectedTopic = expectedTopic; + STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); + STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(); + } + + @Override + public String getKind() { + return "PublishTest"; + } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), + "Still waiting for %s messages to be published", + STATE.remainingExpectedOutgoingMessages.size()); + STATE.isActive = false; + STATE.remainingExpectedOutgoingMessages = null; + } + } + }; + } + + /** + * Return a factory for testing subscribers. Only one factory may be in-flight at a time. + * The factory must be closed when the test in complete + */ + public static PubsubTestClientFactory createFactoryForPull( + final Clock clock, + final SubscriptionPath expectedSubscription, + final int ackTimeoutSec, + final Iterable<IncomingMessage> expectedIncomingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.clock = clock; + STATE.expectedSubscription = expectedSubscription; + STATE.ackTimeoutSec = ackTimeoutSec; + STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages); + STATE.pendingAckIncomingMessages = new HashMap<>(); + STATE.ackDeadline = new HashMap<>(); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(); + } + + @Override + public String getKind() { + return "PullTest"; + } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingPendingIncomingMessages.isEmpty(), + "Still waiting for %s messages to be pulled", + STATE.remainingPendingIncomingMessages.size()); + checkState(STATE.pendingAckIncomingMessages.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.pendingAckIncomingMessages.size()); + checkState(STATE.ackDeadline.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.ackDeadline.size()); + STATE.isActive = false; + STATE.remainingPendingIncomingMessages = null; + STATE.pendingAckIncomingMessages = null; + STATE.ackDeadline = null; + } + } + }; + } + + public static PubsubTestClientFactory createFactoryForCreateSubscription() { + return new PubsubTestClientFactory() { + int numCalls = 0; + + @Override + public void close() throws IOException { + checkState( + numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient() { + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) + throws IOException { + checkState(numCalls == 0, "Expected at most one subscription to be created"); + numCalls++; + } + }; + } + + @Override + public String getKind() { + return "CreateSubscriptionTest"; + } + }; + } + + /** + * Return true if in pull mode. + */ + private boolean inPullMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedSubscription != null; + } + + /** + * Return true if in publish mode. + */ + private boolean inPublishMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedTopic != null; + } + + /** + * For subscription mode only: + * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub + * expiring + * outstanding ACKs. + */ + public void advance() { + synchronized (STATE) { + checkState(inPullMode(), "Can only advance in pull mode"); + // Any messages who's ACKs timed out are available for re-pulling. + Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator(); + while (deadlineItr.hasNext()) { + Map.Entry<String, Long> entry = deadlineItr.next(); + if (entry.getValue() <= STATE.clock.currentTimeMillis()) { + STATE.remainingPendingIncomingMessages.add( + STATE.pendingAckIncomingMessages.remove(entry.getKey())); + deadlineItr.remove(); + } + } + } + } + + @Override + public void close() { + } + + @Override + public int publish( + TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { + synchronized (STATE) { + checkState(inPublishMode(), "Can only publish in publish mode"); + checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, + STATE.expectedTopic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { + throw new RuntimeException("Simulating failure for " + outgoingMessage); + } + checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), + "Unexpected outgoing message %s", outgoingMessage); + } + return outgoingMessages.size(); + } + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, + boolean returnImmediately) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only pull in pull mode"); + long now = STATE.clock.currentTimeMillis(); + checkState(requestTimeMsSinceEpoch == now, + "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + checkState(returnImmediately, "Pull only supported if returning immediately"); + + List<IncomingMessage> incomingMessages = new ArrayList<>(); + Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator(); + while (pendItr.hasNext()) { + IncomingMessage incomingMessage = pendItr.next(); + pendItr.remove(); + IncomingMessage incomingMessageWithRequestTime = + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); + incomingMessages.add(incomingMessageWithRequestTime); + STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, + incomingMessageWithRequestTime); + STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, + requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); + if (incomingMessages.size() >= batchSize) { + break; + } + } + return incomingMessages; + } + } + + @Override + public void acknowledge( + SubscriptionPath subscription, + List<String> ackIds) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only acknowledge in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + } + } + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only modify ack deadline in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + if (deadlineSeconds > 0) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.containsKey(ackId), + "No message with ACK id %s is waiting for an ACK", ackId); + STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000); + } else { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId); + checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId); + STATE.remainingPendingIncomingMessages.add(message); + } + } + } + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<SubscriptionPath> listSubscriptions( + ProjectPath project, TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + synchronized (STATE) { + return STATE.ackTimeoutSec; + } + } + + @Override + public boolean isEOF() { + synchronized (STATE) { + checkState(inPullMode(), "Can only check EOF in pull mode"); + return STATE.remainingPendingIncomingMessages.isEmpty(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/pom.xml b/integration/java/pom.xml new file mode 100644 index 0000000..dcad4c3 --- /dev/null +++ b/integration/java/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-integration-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-integration-java-parent</artifactId> + <packaging>pom</packaging> + <name>Apache Beam :: Integration Tests :: Java</name> + + <modules> + <module>nexmark</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml new file mode 100644 index 0000000..4839da5 --- /dev/null +++ b/integration/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-integration-parent</artifactId> + <packaging>pom</packaging> + <name>Apache Beam :: Integration Tests</name> + + <modules> + <module>java</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c92d391..bddbf1f 100644 --- a/pom.xml +++ b/pom.xml @@ -187,7 +187,7 @@ <module>sdks</module> <module>runners</module> <module>examples</module> - <module>integration/java/nexmark</module> + <module>integration</module> <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. --> <module>sdks/java/javadoc</module> </modules>
