http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java deleted file mode 100644 index 4a6ddac..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.auth.Credentials; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import com.google.pubsub.v1.AcknowledgeRequest; -import com.google.pubsub.v1.DeleteSubscriptionRequest; -import com.google.pubsub.v1.DeleteTopicRequest; -import com.google.pubsub.v1.GetSubscriptionRequest; -import com.google.pubsub.v1.ListSubscriptionsRequest; -import com.google.pubsub.v1.ListSubscriptionsResponse; -import com.google.pubsub.v1.ListTopicsRequest; -import com.google.pubsub.v1.ListTopicsResponse; -import com.google.pubsub.v1.ModifyAckDeadlineRequest; -import com.google.pubsub.v1.PublishRequest; -import com.google.pubsub.v1.PublishResponse; -import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PullRequest; -import com.google.pubsub.v1.PullResponse; -import com.google.pubsub.v1.ReceivedMessage; -import com.google.pubsub.v1.SubscriberGrpc; -import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub; -import com.google.pubsub.v1.Subscription; -import com.google.pubsub.v1.Topic; -import io.grpc.Channel; -import io.grpc.ClientInterceptors; -import io.grpc.ManagedChannel; -import io.grpc.auth.ClientAuthInterceptor; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * A helper class for talking to Pubsub via grpc. - * - * <p>CAUTION: Currently uses the application default credentials and does not respect any - * credentials-related arguments in {@link GcpOptions}. - */ -public class PubsubGrpcClient extends PubsubClient { - private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; - private static final int PUBSUB_PORT = 443; - private static final int LIST_BATCH_SIZE = 1000; - - private static final int DEFAULT_TIMEOUT_S = 15; - - private static class PubsubGrpcClientFactory implements PubsubClientFactory { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - ManagedChannel channel = NettyChannelBuilder - .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .build(); - - return new PubsubGrpcClient(timestampLabel, - idLabel, - DEFAULT_TIMEOUT_S, - channel, - options.getGcpCredential()); - } - - @Override - public String getKind() { - return "Grpc"; - } - } - - /** - * Factory for creating Pubsub clients using gRCP transport. - */ - public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory(); - - /** - * Timeout for grpc calls (in s). - */ - private final int timeoutSec; - - /** - * Underlying netty channel, or {@literal null} if closed. - */ - @Nullable - private ManagedChannel publisherChannel; - - /** - * Credentials determined from options and environment. - */ - private final Credentials credentials; - - /** - * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time - * instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. - */ - @Nullable - private final String idLabel; - - - /** - * Cached stubs, or null if not cached. - */ - @Nullable - private PublisherGrpc.PublisherBlockingStub cachedPublisherStub; - private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub; - - @VisibleForTesting - PubsubGrpcClient( - @Nullable String timestampLabel, - @Nullable String idLabel, - int timeoutSec, - ManagedChannel publisherChannel, - Credentials credentials) { - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.timeoutSec = timeoutSec; - this.publisherChannel = publisherChannel; - this.credentials = credentials; - } - - /** - * Gracefully close the underlying netty channel. - */ - @Override - public void close() { - if (publisherChannel == null) { - // Already closed. - return; - } - // Can gc the underlying stubs. - cachedPublisherStub = null; - cachedSubscriberStub = null; - // Mark the client as having been closed before going further - // in case we have an exception from the channel. - ManagedChannel publisherChannel = this.publisherChannel; - this.publisherChannel = null; - // Gracefully shutdown the channel. - publisherChannel.shutdown(); - try { - publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Ignore. - Thread.currentThread().interrupt(); - } - } - - /** - * Return channel with interceptor for returning credentials. - */ - private Channel newChannel() throws IOException { - checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); - ClientAuthInterceptor interceptor = - new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor()); - return ClientInterceptors.intercept(publisherChannel, interceptor); - } - - /** - * Return a stub for making a publish request with a timeout. - */ - private PublisherBlockingStub publisherStub() throws IOException { - if (cachedPublisherStub == null) { - cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel()); - } - return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); - } - - /** - * Return a stub for making a subscribe request with a timeout. - */ - private SubscriberBlockingStub subscriberStub() throws IOException { - if (cachedSubscriberStub == null) { - cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel()); - } - return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); - } - - @Override - public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) - throws IOException { - PublishRequest.Builder request = PublishRequest.newBuilder() - .setTopic(topic.getPath()); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage.Builder message = - PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(outgoingMessage.elementBytes)); - - if (outgoingMessage.attributes != null) { - message.putAllAttributes(outgoingMessage.attributes); - } - - if (timestampLabel != null) { - message.getMutableAttributes() - .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); - } - - if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { - message.getMutableAttributes().put(idLabel, outgoingMessage.recordId); - } - - request.addMessages(message); - } - - PublishResponse response = publisherStub().publish(request.build()); - return response.getMessageIdsCount(); - } - - @Override - public List<IncomingMessage> pull( - long requestTimeMsSinceEpoch, - SubscriptionPath subscription, - int batchSize, - boolean returnImmediately) throws IOException { - PullRequest request = PullRequest.newBuilder() - .setSubscription(subscription.getPath()) - .setReturnImmediately(returnImmediately) - .setMaxMessages(batchSize) - .build(); - PullResponse response = subscriberStub().pull(request); - if (response.getReceivedMessagesCount() == 0) { - return ImmutableList.of(); - } - List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount()); - for (ReceivedMessage message : response.getReceivedMessagesList()) { - PubsubMessage pubsubMessage = message.getMessage(); - @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); - - // Payload. - byte[] elementBytes = pubsubMessage.getData().toByteArray(); - - // Timestamp. - String pubsubTimestampString = null; - Timestamp timestampProto = pubsubMessage.getPublishTime(); - if (timestampProto != null) { - pubsubTimestampString = String.valueOf(timestampProto.getSeconds() - + timestampProto.getNanos() / 1000L); - } - long timestampMsSinceEpoch = - extractTimestamp(timestampLabel, pubsubTimestampString, attributes); - - // Ack id. - String ackId = message.getAckId(); - checkState(!Strings.isNullOrEmpty(ackId)); - - // Record id, if any. - @Nullable String recordId = null; - if (idLabel != null && attributes != null) { - recordId = attributes.get(idLabel); - } - if (Strings.isNullOrEmpty(recordId)) { - // Fall back to the Pubsub provided message id. - recordId = pubsubMessage.getMessageId(); - } - - incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); - } - return incomingMessages; - } - - @Override - public void acknowledge(SubscriptionPath subscription, List<String> ackIds) - throws IOException { - AcknowledgeRequest request = AcknowledgeRequest.newBuilder() - .setSubscription(subscription.getPath()) - .addAllAckIds(ackIds) - .build(); - subscriberStub().acknowledge(request); // ignore Empty result. - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) - throws IOException { - ModifyAckDeadlineRequest request = - ModifyAckDeadlineRequest.newBuilder() - .setSubscription(subscription.getPath()) - .addAllAckIds(ackIds) - .setAckDeadlineSeconds(deadlineSeconds) - .build(); - subscriberStub().modifyAckDeadline(request); // ignore Empty result. - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - Topic request = Topic.newBuilder() - .setName(topic.getPath()) - .build(); - publisherStub().createTopic(request); // ignore Topic result. - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - DeleteTopicRequest request = DeleteTopicRequest.newBuilder() - .setTopic(topic.getPath()) - .build(); - publisherStub().deleteTopic(request); // ignore Empty result. - } - - @Override - public List<TopicPath> listTopics(ProjectPath project) throws IOException { - ListTopicsRequest.Builder request = - ListTopicsRequest.newBuilder() - .setProject(project.getPath()) - .setPageSize(LIST_BATCH_SIZE); - ListTopicsResponse response = publisherStub().listTopics(request.build()); - if (response.getTopicsCount() == 0) { - return ImmutableList.of(); - } - List<TopicPath> topics = new ArrayList<>(response.getTopicsCount()); - while (true) { - for (Topic topic : response.getTopicsList()) { - topics.add(topicPathFromPath(topic.getName())); - } - if (response.getNextPageToken().isEmpty()) { - break; - } - request.setPageToken(response.getNextPageToken()); - response = publisherStub().listTopics(request.build()); - } - return topics; - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, - int ackDeadlineSeconds) throws IOException { - Subscription request = Subscription.newBuilder() - .setTopic(topic.getPath()) - .setName(subscription.getPath()) - .setAckDeadlineSeconds(ackDeadlineSeconds) - .build(); - subscriberStub().createSubscription(request); // ignore Subscription result. - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - DeleteSubscriptionRequest request = - DeleteSubscriptionRequest.newBuilder() - .setSubscription(subscription.getPath()) - .build(); - subscriberStub().deleteSubscription(request); // ignore Empty result. - } - - @Override - public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException { - ListSubscriptionsRequest.Builder request = - ListSubscriptionsRequest.newBuilder() - .setProject(project.getPath()) - .setPageSize(LIST_BATCH_SIZE); - ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build()); - if (response.getSubscriptionsCount() == 0) { - return ImmutableList.of(); - } - List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount()); - while (true) { - for (Subscription subscription : response.getSubscriptionsList()) { - if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(subscriptionPathFromPath(subscription.getName())); - } - } - if (response.getNextPageToken().isEmpty()) { - break; - } - request.setPageToken(response.getNextPageToken()); - response = subscriberStub().listSubscriptions(request.build()); - } - return subscriptions; - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - GetSubscriptionRequest request = - GetSubscriptionRequest.newBuilder() - .setSubscription(subscription.getPath()) - .build(); - Subscription response = subscriberStub().getSubscription(request); - return response.getAckDeadlineSeconds(); - } - - @Override - public boolean isEOF() { - return false; - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java deleted file mode 100644 index ef8abfd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.Pubsub.Builder; -import com.google.api.services.pubsub.model.AcknowledgeRequest; -import com.google.api.services.pubsub.model.ListSubscriptionsResponse; -import com.google.api.services.pubsub.model.ListTopicsResponse; -import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PublishResponse; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * A Pubsub client using JSON transport. - */ -public class PubsubJsonClient extends PubsubClient { - - private static class PubsubJsonClientFactory implements PubsubClientFactory { - private static HttpRequestInitializer chainHttpRequestInitializer( - Credentials credential, HttpRequestInitializer httpRequestInitializer) { - if (credential == null) { - return httpRequestInitializer; - } else { - return new ChainingHttpRequestInitializer( - new HttpCredentialsAdapter(credential), - httpRequestInitializer); - } - } - - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - Pubsub pubsub = new Builder( - Transport.getTransport(), - Transport.getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) - .build(); - return new PubsubJsonClient(timestampLabel, idLabel, pubsub); - } - - @Override - public String getKind() { - return "Json"; - } - } - - /** - * Factory for creating Pubsub clients using Json transport. - */ - public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); - - /** - * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time - * instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. - */ - @Nullable - private final String idLabel; - - /** - * Underlying JSON transport. - */ - private Pubsub pubsub; - - @VisibleForTesting - PubsubJsonClient( - @Nullable String timestampLabel, - @Nullable String idLabel, - Pubsub pubsub) { - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.pubsub = pubsub; - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) - throws IOException { - List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); - - Map<String, String> attributes = outgoingMessage.attributes; - if ((timestampLabel != null || idLabel != null) && attributes == null) { - attributes = new TreeMap<>(); - } - if (attributes != null) { - pubsubMessage.setAttributes(attributes); - } - - if (timestampLabel != null) { - attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); - } - - if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { - attributes.put(idLabel, outgoingMessage.recordId); - } - - pubsubMessages.add(pubsubMessage); - } - PublishRequest request = new PublishRequest().setMessages(pubsubMessages); - PublishResponse response = pubsub.projects() - .topics() - .publish(topic.getPath(), request) - .execute(); - return response.getMessageIds().size(); - } - - @Override - public List<IncomingMessage> pull( - long requestTimeMsSinceEpoch, - SubscriptionPath subscription, - int batchSize, - boolean returnImmediately) throws IOException { - PullRequest request = new PullRequest() - .setReturnImmediately(returnImmediately) - .setMaxMessages(batchSize); - PullResponse response = pubsub.projects() - .subscriptions() - .pull(subscription.getPath(), request) - .execute(); - if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { - return ImmutableList.of(); - } - List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); - for (ReceivedMessage message : response.getReceivedMessages()) { - PubsubMessage pubsubMessage = message.getMessage(); - @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); - - // Payload. - byte[] elementBytes = pubsubMessage.decodeData(); - - // Timestamp. - long timestampMsSinceEpoch = - extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); - - // Ack id. - String ackId = message.getAckId(); - checkState(!Strings.isNullOrEmpty(ackId)); - - // Record id, if any. - @Nullable String recordId = null; - if (idLabel != null && attributes != null) { - recordId = attributes.get(idLabel); - } - if (Strings.isNullOrEmpty(recordId)) { - // Fall back to the Pubsub provided message id. - recordId = pubsubMessage.getMessageId(); - } - - incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); - } - - return incomingMessages; - } - - @Override - public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException { - AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); - pubsub.projects() - .subscriptions() - .acknowledge(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) - throws IOException { - ModifyAckDeadlineRequest request = - new ModifyAckDeadlineRequest().setAckIds(ackIds) - .setAckDeadlineSeconds(deadlineSeconds); - pubsub.projects() - .subscriptions() - .modifyAckDeadline(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .create(topic.getPath(), new Topic()) - .execute(); // ignore Topic result. - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .delete(topic.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List<TopicPath> listTopics(ProjectPath project) throws IOException { - ListTopicsResponse response = pubsub.projects() - .topics() - .list(project.getPath()) - .execute(); - if (response.getTopics() == null || response.getTopics().isEmpty()) { - return ImmutableList.of(); - } - List<TopicPath> topics = new ArrayList<>(response.getTopics().size()); - for (Topic topic : response.getTopics()) { - topics.add(topicPathFromPath(topic.getName())); - } - return topics; - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, - int ackDeadlineSeconds) throws IOException { - Subscription request = new Subscription() - .setTopic(topic.getPath()) - .setAckDeadlineSeconds(ackDeadlineSeconds); - pubsub.projects() - .subscriptions() - .create(subscription.getPath(), request) - .execute(); // ignore Subscription result. - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - pubsub.projects() - .subscriptions() - .delete(subscription.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException { - ListSubscriptionsResponse response = pubsub.projects() - .subscriptions() - .list(project.getPath()) - .execute(); - if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { - return ImmutableList.of(); - } - List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size()); - for (Subscription subscription : response.getSubscriptions()) { - if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(subscriptionPathFromPath(subscription.getName())); - } - } - return subscriptions; - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); - return response.getAckDeadlineSeconds(); - } - - @Override - public boolean isEOF() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java deleted file mode 100644 index 61479f9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.util.Clock; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for - * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} - * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. - */ -public class PubsubTestClient extends PubsubClient implements Serializable { - /** - * Mimic the state of the simulated Pubsub 'service'. - * - * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running - * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created - * from the same client factory and run in parallel. Thus we can't enforce aliasing of the - * following data structures over all clients and must resort to a static. - */ - private static class State { - /** - * True if has been primed for a test but not yet validated. - */ - boolean isActive; - - /** - * Publish mode only: Only publish calls for this topic are allowed. - */ - @Nullable - TopicPath expectedTopic; - - /** - * Publish mode only: Messages yet to seen in a {@link #publish} call. - */ - @Nullable - Set<OutgoingMessage> remainingExpectedOutgoingMessages; - - /** - * Publish mode only: Messages which should throw when first sent to simulate transient publish - * failure. - */ - @Nullable - Set<OutgoingMessage> remainingFailingOutgoingMessages; - - /** - * Pull mode only: Clock from which to get current time. - */ - @Nullable - Clock clock; - - /** - * Pull mode only: Only pull calls for this subscription are allowed. - */ - @Nullable - SubscriptionPath expectedSubscription; - - /** - * Pull mode only: Timeout to simulate. - */ - int ackTimeoutSec; - - /** - * Pull mode only: Messages waiting to be received by a {@link #pull} call. - */ - @Nullable - List<IncomingMessage> remainingPendingIncomingMessages; - - /** - * Pull mode only: Messages which have been returned from a {@link #pull} call and - * not yet ACKed by an {@link #acknowledge} call. - */ - @Nullable - Map<String, IncomingMessage> pendingAckIncomingMessages; - - /** - * Pull mode only: When above messages are due to have their ACK deadlines expire. - */ - @Nullable - Map<String, Long> ackDeadline; - } - - private static final State STATE = new State(); - - /** Closing the factory will validate all expected messages were processed. */ - public interface PubsubTestClientFactory - extends PubsubClientFactory, Closeable, Serializable { - } - - /** - * Return a factory for testing publishers. Only one factory may be in-flight at a time. - * The factory must be closed when the test is complete, at which point final validation will - * occur. - */ - public static PubsubTestClientFactory createFactoryForPublish( - final TopicPath expectedTopic, - final Iterable<OutgoingMessage> expectedOutgoingMessages, - final Iterable<OutgoingMessage> failingOutgoingMessages) { - synchronized (STATE) { - checkState(!STATE.isActive, "Test still in flight"); - STATE.expectedTopic = expectedTopic; - STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); - STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); - STATE.isActive = true; - } - return new PubsubTestClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient(); - } - - @Override - public String getKind() { - return "PublishTest"; - } - - @Override - public void close() { - synchronized (STATE) { - checkState(STATE.isActive, "No test still in flight"); - checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), - "Still waiting for %s messages to be published", - STATE.remainingExpectedOutgoingMessages.size()); - STATE.isActive = false; - STATE.remainingExpectedOutgoingMessages = null; - } - } - }; - } - - /** - * Return a factory for testing subscribers. Only one factory may be in-flight at a time. - * The factory must be closed when the test in complete - */ - public static PubsubTestClientFactory createFactoryForPull( - final Clock clock, - final SubscriptionPath expectedSubscription, - final int ackTimeoutSec, - final Iterable<IncomingMessage> expectedIncomingMessages) { - synchronized (STATE) { - checkState(!STATE.isActive, "Test still in flight"); - STATE.clock = clock; - STATE.expectedSubscription = expectedSubscription; - STATE.ackTimeoutSec = ackTimeoutSec; - STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages); - STATE.pendingAckIncomingMessages = new HashMap<>(); - STATE.ackDeadline = new HashMap<>(); - STATE.isActive = true; - } - return new PubsubTestClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient(); - } - - @Override - public String getKind() { - return "PullTest"; - } - - @Override - public void close() { - synchronized (STATE) { - checkState(STATE.isActive, "No test still in flight"); - checkState(STATE.remainingPendingIncomingMessages.isEmpty(), - "Still waiting for %s messages to be pulled", - STATE.remainingPendingIncomingMessages.size()); - checkState(STATE.pendingAckIncomingMessages.isEmpty(), - "Still waiting for %s messages to be ACKed", - STATE.pendingAckIncomingMessages.size()); - checkState(STATE.ackDeadline.isEmpty(), - "Still waiting for %s messages to be ACKed", - STATE.ackDeadline.size()); - STATE.isActive = false; - STATE.remainingPendingIncomingMessages = null; - STATE.pendingAckIncomingMessages = null; - STATE.ackDeadline = null; - } - } - }; - } - - public static PubsubTestClientFactory createFactoryForCreateSubscription() { - return new PubsubTestClientFactory() { - int numCalls = 0; - - @Override - public void close() throws IOException { - checkState( - numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); - } - - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - return new PubsubTestClient() { - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) - throws IOException { - checkState(numCalls == 0, "Expected at most one subscription to be created"); - numCalls++; - } - }; - } - - @Override - public String getKind() { - return "CreateSubscriptionTest"; - } - }; - } - - /** - * Return true if in pull mode. - */ - private boolean inPullMode() { - checkState(STATE.isActive, "No test is active"); - return STATE.expectedSubscription != null; - } - - /** - * Return true if in publish mode. - */ - private boolean inPublishMode() { - checkState(STATE.isActive, "No test is active"); - return STATE.expectedTopic != null; - } - - /** - * For subscription mode only: - * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub - * expiring - * outstanding ACKs. - */ - public void advance() { - synchronized (STATE) { - checkState(inPullMode(), "Can only advance in pull mode"); - // Any messages who's ACKs timed out are available for re-pulling. - Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator(); - while (deadlineItr.hasNext()) { - Map.Entry<String, Long> entry = deadlineItr.next(); - if (entry.getValue() <= STATE.clock.currentTimeMillis()) { - STATE.remainingPendingIncomingMessages.add( - STATE.pendingAckIncomingMessages.remove(entry.getKey())); - deadlineItr.remove(); - } - } - } - } - - @Override - public void close() { - } - - @Override - public int publish( - TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { - synchronized (STATE) { - checkState(inPublishMode(), "Can only publish in publish mode"); - checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, - STATE.expectedTopic); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { - throw new RuntimeException("Simulating failure for " + outgoingMessage); - } - checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), - "Unexpected outgoing message %s", outgoingMessage); - } - return outgoingMessages.size(); - } - } - - @Override - public List<IncomingMessage> pull( - long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, - boolean returnImmediately) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only pull in pull mode"); - long now = STATE.clock.currentTimeMillis(); - checkState(requestTimeMsSinceEpoch == now, - "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - checkState(returnImmediately, "Pull only supported if returning immediately"); - - List<IncomingMessage> incomingMessages = new ArrayList<>(); - Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator(); - while (pendItr.hasNext()) { - IncomingMessage incomingMessage = pendItr.next(); - pendItr.remove(); - IncomingMessage incomingMessageWithRequestTime = - incomingMessage.withRequestTime(requestTimeMsSinceEpoch); - incomingMessages.add(incomingMessageWithRequestTime); - STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, - incomingMessageWithRequestTime); - STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, - requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); - if (incomingMessages.size() >= batchSize) { - break; - } - } - return incomingMessages; - } - } - - @Override - public void acknowledge( - SubscriptionPath subscription, - List<String> ackIds) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only acknowledge in pull mode"); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - - for (String ackId : ackIds) { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - } - } - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException { - synchronized (STATE) { - checkState(inPullMode(), "Can only modify ack deadline in pull mode"); - checkState(subscription.equals(STATE.expectedSubscription), - "Subscription %s does not match expected %s", subscription, - STATE.expectedSubscription); - - for (String ackId : ackIds) { - if (deadlineSeconds > 0) { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - checkState(STATE.pendingAckIncomingMessages.containsKey(ackId), - "No message with ACK id %s is waiting for an ACK", ackId); - STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000); - } else { - checkState(STATE.ackDeadline.remove(ackId) != null, - "No message with ACK id %s is waiting for an ACK", ackId); - IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId); - checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId); - STATE.remainingPendingIncomingMessages.add(message); - } - } - } - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public List<TopicPath> listTopics(ProjectPath project) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public List<SubscriptionPath> listSubscriptions( - ProjectPath project, TopicPath topic) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - synchronized (STATE) { - return STATE.ackTimeoutSec; - } - } - - @Override - public boolean isEOF() { - synchronized (STATE) { - checkState(inPullMode(), "Can only check EOF in pull mode"); - return STATE.remainingPendingIncomingMessages.isEmpty(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 1edfa1d..80c093b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -107,8 +107,7 @@ public class Transport { /** * Returns a Pubsub client builder using the specified {@link PubsubOptions}. * - * @deprecated Use an appropriate - * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory} + * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory */ @Deprecated public static Pubsub.Builder http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java deleted file mode 100644 index c996409..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -import java.util.Set; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.UsesUnboundedPCollections; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for PubsubIO Read and Write transforms. - */ -@RunWith(JUnit4.class) -public class PubsubIOTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testPubsubIOGetName() { - assertEquals("PubsubIO.Read", - PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName()); - assertEquals("PubsubIO.Write", - PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName()); - } - - @Test - public void testTopicValidationSuccess() throws Exception { - PubsubIO.<String>read().topic("projects/my-project/topics/abc"); - PubsubIO.<String>read().topic("projects/my-project/topics/ABC"); - PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF"); - PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234"); - PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); - PubsubIO.<String>read().topic(new StringBuilder() - .append("projects/my-project/topics/A-really-long-one-") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("11111111111111111111111111111111111111111111111111111111111111111111111111") - .toString()); - } - - @Test - public void testTopicValidationBadCharacter() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc"); - } - - @Test - public void testTopicValidationTooLong() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.<String>read().topic(new StringBuilder().append - ("projects/my-project/topics/A-really-long-one-") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("1111111111111111111111111111111111111111111111111111111111111111111111111111") - .toString()); - } - - @Test - public void testReadTopicDisplayData() { - String topic = "projects/project/topics/topic"; - String subscription = "projects/project/subscriptions/subscription"; - Duration maxReadTime = Duration.standardMinutes(5); - PubsubIO.Read<String> read = PubsubIO.<String>read() - .topic(StaticValueProvider.of(topic)) - .timestampLabel("myTimestamp") - .idLabel("myId"); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("topic", topic)); - assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); - assertThat(displayData, hasDisplayItem("idLabel", "myId")); - } - - @Test - public void testReadSubscriptionDisplayData() { - String topic = "projects/project/topics/topic"; - String subscription = "projects/project/subscriptions/subscription"; - Duration maxReadTime = Duration.standardMinutes(5); - PubsubIO.Read<String> read = PubsubIO.<String>read() - .subscription(StaticValueProvider.of(subscription)) - .timestampLabel("myTimestamp") - .idLabel("myId"); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("subscription", subscription)); - assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); - assertThat(displayData, hasDisplayItem("idLabel", "myId")); - } - - @Test - public void testNullTopic() { - String subscription = "projects/project/subscriptions/subscription"; - PubsubIO.Read<String> read = PubsubIO.<String>read() - .subscription(StaticValueProvider.of(subscription)); - assertNull(read.getTopic()); - assertNotNull(read.getSubscription()); - assertNotNull(DisplayData.from(read)); - } - - @Test - public void testNullSubscription() { - String topic = "projects/project/topics/topic"; - PubsubIO.Read<String> read = PubsubIO.<String>read() - .topic(StaticValueProvider.of(topic)); - assertNotNull(read.getTopic()); - assertNull(read.getSubscription()); - assertNotNull(DisplayData.from(read)); - } - - @Test - @Category({ValidatesRunner.class, UsesUnboundedPCollections.class}) - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - Set<DisplayData> displayData; - PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of()); - - // Reading from a subscription. - read = read.subscription("projects/project/subscriptions/subscription"); - displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("PubsubIO.Read should include the subscription in its primitive display data", - displayData, hasItem(hasDisplayItem("subscription"))); - - // Reading from a topic. - read = read.topic("projects/project/topics/topic"); - displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("PubsubIO.Read should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); - } - - @Test - public void testWriteDisplayData() { - String topic = "projects/project/topics/topic"; - PubsubIO.Write<?> write = PubsubIO.<String>write() - .topic(topic) - .timestampLabel("myTimestamp") - .idLabel("myId"); - - DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("topic", topic)); - assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); - assertThat(displayData, hasDisplayItem("idLabel", "myId")); - } - - @Test - @Category(ValidatesRunner.class) - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("PubsubIO.Write should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java deleted file mode 100644 index 7a4be62..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test PubsubUnboundedSink. - */ -@RunWith(JUnit4.class) -public class PubsubUnboundedSinkTest implements Serializable { - private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private static final String DATA = "testData"; - private static final Map<String, String> ATTRIBUTES = - ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build(); - private static final long TIMESTAMP = 1234L; - private static final String TIMESTAMP_LABEL = "timestamp"; - private static final String ID_LABEL = "id"; - private static final int NUM_SHARDS = 10; - - private static class Stamp extends DoFn<String, String> { - @ProcessElement - public void processElement(ProcessContext c) { - c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); - } - } - - private String getRecordId(String data) { - return Hashing.murmur3_128().hashBytes(data.getBytes()).toString(); - } - - @Rule - public transient TestPipeline p = TestPipeline.create(); - - @Test - public void saneCoder() throws Exception { - OutgoingMessage message = new OutgoingMessage( - DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA)); - CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); - CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); - } - - @Test - @Category(NeedsRunner.class) - public void sendOneMessage() throws IOException { - List<OutgoingMessage> outgoing = - ImmutableList.of(new OutgoingMessage( - DATA.getBytes(), - ATTRIBUTES, - TIMESTAMP, getRecordId(DATA))); - int batchSize = 1; - int batchBytes = 1; - try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, - ImmutableList.<OutgoingMessage>of())) { - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), - TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, - Duration.standardSeconds(2), - new SimpleFunction<String, PubsubIO.PubsubMessage>() { - @Override - public PubsubIO.PubsubMessage apply(String input) { - return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES); - } - }, - RecordIdMethod.DETERMINISTIC); - p.apply(Create.of(ImmutableList.of(DATA))) - .apply(ParDo.of(new Stamp())) - .apply(sink); - p.run(); - } - // The PubsubTestClientFactory will assert fail on close if the actual published - // message does not match the expected publish message. - } - - @Test - @Category(NeedsRunner.class) - public void sendMoreThanOneBatchByNumMessages() throws IOException { - List<OutgoingMessage> outgoing = new ArrayList<>(); - List<String> data = new ArrayList<>(); - int batchSize = 2; - int batchBytes = 1000; - for (int i = 0; i < batchSize * 10; i++) { - String str = String.valueOf(i); - outgoing.add(new OutgoingMessage( - str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str))); - data.add(str); - } - try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, - ImmutableList.<OutgoingMessage>of())) { - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), - TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, - Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC); - p.apply(Create.of(data)) - .apply(ParDo.of(new Stamp())) - .apply(sink); - p.run(); - } - // The PubsubTestClientFactory will assert fail on close if the actual published - // message does not match the expected publish message. - } - - @Test - @Category(NeedsRunner.class) - public void sendMoreThanOneBatchByByteSize() throws IOException { - List<OutgoingMessage> outgoing = new ArrayList<>(); - List<String> data = new ArrayList<>(); - int batchSize = 100; - int batchBytes = 10; - int n = 0; - while (n < batchBytes * 10) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < batchBytes; i++) { - sb.append(String.valueOf(n)); - } - String str = sb.toString(); - outgoing.add(new OutgoingMessage( - str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str))); - data.add(str); - n += str.length(); - } - try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, - ImmutableList.<OutgoingMessage>of())) { - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), - StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - null, RecordIdMethod.DETERMINISTIC); - p.apply(Create.of(data)) - .apply(ParDo.of(new Stamp())) - .apply(sink); - p.run(); - } - // The PubsubTestClientFactory will assert fail on close if the actual published - // message does not match the expected publish message. - } - - // TODO: We would like to test that failed Pubsub publish calls cause the already assigned - // (and random) record ids to be reused. However that can't be done without the test runnner - // supporting retrying bundles. -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java deleted file mode 100644 index d9df2ca..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ /dev/null @@ -1,411 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import static junit.framework.TestCase.assertFalse; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import com.google.api.client.util.Clock; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader; -import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubTestClient; -import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; -import org.joda.time.Instant; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test PubsubUnboundedSource. - */ -@RunWith(JUnit4.class) -public class PubsubUnboundedSourceTest { - private static final SubscriptionPath SUBSCRIPTION = - PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); - private static final String DATA = "testData"; - private static final long TIMESTAMP = 1234L; - private static final long REQ_TIME = 6373L; - private static final String TIMESTAMP_LABEL = "timestamp"; - private static final String ID_LABEL = "id"; - private static final String ACK_ID = "testAckId"; - private static final String RECORD_ID = "testRecordId"; - private static final int ACK_TIMEOUT_S = 60; - - private AtomicLong now; - private Clock clock; - private PubsubTestClientFactory factory; - private PubsubSource<String> primSource; - - @Rule - public TestPipeline p = TestPipeline.create(); - - private void setupOneMessage(Iterable<IncomingMessage> incoming) { - now = new AtomicLong(REQ_TIME); - clock = new Clock() { - @Override - public long currentTimeMillis() { - return now.get(); - } - }; - factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( - clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION), - StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null); - primSource = new PubsubSource<>(source); - } - - private void setupOneMessage() { - setupOneMessage(ImmutableList.of( - new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID))); - } - - @After - public void after() throws IOException { - factory.close(); - now = null; - clock = null; - primSource = null; - factory = null; - } - - @Test - public void checkpointCoderIsSane() throws Exception { - setupOneMessage(ImmutableList.<IncomingMessage>of()); - CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder()); - // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make - // equals on checkpoints ignore those fields, we'll test serialization and deserialization - // of checkpoints in multipleReaders below. - } - - @Test - public void readOneMessage() throws IOException { - setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - // Read one message. - assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); - assertFalse(reader.advance()); - // ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - reader.close(); - } - - @Test - public void timeoutAckAndRereadOneMessage() throws IOException { - setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); - assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); - // Let the ACK deadline for the above expire. - now.addAndGet(65 * 1000); - pubsubClient.advance(); - // We'll now receive the same message again. - assertTrue(reader.advance()); - assertEquals(DATA, reader.getCurrent()); - assertFalse(reader.advance()); - // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - reader.close(); - } - - @Test - public void extendAck() throws IOException { - setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); - // Pull the first message but don't take a checkpoint for it. - assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); - // Extend the ack - now.addAndGet(55 * 1000); - pubsubClient.advance(); - assertFalse(reader.advance()); - // Extend the ack again - now.addAndGet(25 * 1000); - pubsubClient.advance(); - assertFalse(reader.advance()); - // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - reader.close(); - } - - @Test - public void timeoutAckExtensions() throws IOException { - setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); - // Pull the first message but don't take a checkpoint for it. - assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); - // Extend the ack. - now.addAndGet(55 * 1000); - pubsubClient.advance(); - assertFalse(reader.advance()); - // Let the ack expire. - for (int i = 0; i < 3; i++) { - now.addAndGet(25 * 1000); - pubsubClient.advance(); - assertFalse(reader.advance()); - } - // Wait for resend. - now.addAndGet(25 * 1000); - pubsubClient.advance(); - // Reread the same message. - assertTrue(reader.advance()); - assertEquals(DATA, reader.getCurrent()); - // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - reader.close(); - } - - @Test - public void multipleReaders() throws IOException { - List<IncomingMessage> incoming = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - String data = String.format("data_%d", i); - String ackid = String.format("ackid_%d", i); - incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID)); - } - setupOneMessage(incoming); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - // Consume two messages, only read one. - assertTrue(reader.start()); - assertEquals("data_0", reader.getCurrent()); - - // Grab checkpoint. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - assertEquals(1, checkpoint.notYetReadIds.size()); - assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); - - // Read second message. - assertTrue(reader.advance()); - assertEquals("data_1", reader.getCurrent()); - - // Restore from checkpoint. - byte[] checkpointBytes = - CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint); - checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(), - checkpointBytes); - assertEquals(1, checkpoint.notYetReadIds.size()); - assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); - - // Re-read second message. - reader = primSource.createReader(p.getOptions(), checkpoint); - assertTrue(reader.start()); - assertEquals("data_1", reader.getCurrent()); - - // We are done. - assertFalse(reader.advance()); - - // ACK final message. - checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - reader.close(); - } - - private long messageNumToTimestamp(int messageNum) { - return TIMESTAMP + messageNum * 100; - } - - @Test - public void readManyMessages() throws IOException { - Map<String, Integer> dataToMessageNum = new HashMap<>(); - - final int m = 97; - final int n = 10000; - List<IncomingMessage> incoming = new ArrayList<>(); - for (int i = 0; i < n; i++) { - // Make the messages timestamps slightly out of order. - int messageNum = ((i / m) * m) + (m - 1) - (i % m); - String data = String.format("data_%d", messageNum); - dataToMessageNum.put(data, messageNum); - String recid = String.format("recordid_%d", messageNum); - String ackId = String.format("ackid_%d", messageNum); - incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0, - ackId, recid)); - } - setupOneMessage(incoming); - - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); - - for (int i = 0; i < n; i++) { - if (i == 0) { - assertTrue(reader.start()); - } else { - assertTrue(reader.advance()); - } - // We'll checkpoint and ack within the 2min limit. - now.addAndGet(30); - pubsubClient.advance(); - String data = reader.getCurrent(); - Integer messageNum = dataToMessageNum.remove(data); - // No duplicate messages. - assertNotNull(messageNum); - // Preserve timestamp. - assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp()); - // Preserve record id. - String recid = String.format("recordid_%d", messageNum); - assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId()); - - if (i % 1000 == 999) { - // Estimated watermark can never get ahead of actual outstanding messages. - long watermark = reader.getWatermark().getMillis(); - long minOutstandingTimestamp = Long.MAX_VALUE; - for (Integer outstandingMessageNum : dataToMessageNum.values()) { - minOutstandingTimestamp = - Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum)); - } - assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp)); - // Ack messages, but only every other finalization. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - if (i % 2000 == 1999) { - checkpoint.finalizeCheckpoint(); - } - } - } - // We are done. - assertFalse(reader.advance()); - // We saw each message exactly once. - assertTrue(dataToMessageNum.isEmpty()); - reader.close(); - } - - @Test - public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception { - TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); - factory = PubsubTestClient.createFactoryForCreateSubscription(); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( - factory, - StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), - StaticValueProvider.of(topicPath), - null, - StringUtf8Coder.of(), - null, - null, - null); - assertThat(source.getSubscription(), nullValue()); - - assertThat(source.getSubscription(), nullValue()); - - PipelineOptions options = PipelineOptionsFactory.create(); - List<PubsubSource<String>> splits = - (new PubsubSource<>(source)).generateInitialSplits(3, options); - // We have at least one returned split - assertThat(splits, hasSize(greaterThan(0))); - for (PubsubSource<String> split : splits) { - // Each split is equal - assertThat(split, equalTo(splits.get(0))); - } - - assertThat(splits.get(0).subscriptionPath, not(nullValue())); - } - - @Test - public void noSubscriptionNoSplitGeneratesSubscription() throws Exception { - TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); - factory = PubsubTestClient.createFactoryForCreateSubscription(); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( - factory, - StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), - StaticValueProvider.of(topicPath), - null, - StringUtf8Coder.of(), - null, - null, - null); - assertThat(source.getSubscription(), nullValue()); - - assertThat(source.getSubscription(), nullValue()); - - PipelineOptions options = PipelineOptionsFactory.create(); - PubsubSource<String> actualSource = new PubsubSource<>(source); - PubsubReader<String> reader = actualSource.createReader(options, null); - SubscriptionPath createdSubscription = reader.subscription; - assertThat(createdSubscription, not(nullValue())); - - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath())); - - checkpoint.finalizeCheckpoint(); - PubsubCheckpoint<String> deserCheckpoint = - CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint); - assertThat(checkpoint.subscriptionPath, not(nullValue())); - assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath)); - - PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint); - PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint); - - assertThat(readerFromOriginal.subscription, equalTo(createdSubscription)); - assertThat(readerFromDeser.subscription, equalTo(createdSubscription)); - } - - /** - * Tests that checkpoints finalized after the reader is closed succeed. - */ - @Test - public void closeWithActiveCheckpoints() throws Exception { - setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - reader.start(); - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); - reader.close(); - checkpoint.finalizeCheckpoint(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java deleted file mode 100644 index 1a99d38..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.ImmutableMap; -import java.util.Map; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for helper classes and methods in PubsubClient. - */ -@RunWith(JUnit4.class) -public class PubsubClientTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - // - // Timestamp handling - // - - private long parse(String timestamp) { - Map<String, String> map = ImmutableMap.of("myLabel", timestamp); - return PubsubClient.extractTimestamp("myLabel", null, map); - } - - private void roundTripRfc339(String timestamp) { - assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp)); - } - - private void truncatedRfc339(String timestamp, String truncatedTimestmap) { - assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp)); - } - - @Test - public void noTimestampLabelReturnsPubsubPublish() { - final long time = 987654321L; - long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null); - assertEquals(time, timestamp); - } - - @Test - public void noTimestampLabelAndInvalidPubsubPublishThrowsError() { - thrown.expect(NumberFormatException.class); - PubsubClient.extractTimestamp(null, "not-a-date", null); - } - - @Test - public void timestampLabelWithNullAttributesThrowsError() { - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); - PubsubClient.extractTimestamp("myLabel", null, null); - } - - @Test - public void timestampLabelSetWithMissingAttributeThrowsError() { - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); - Map<String, String> map = ImmutableMap.of("otherLabel", "whatever"); - PubsubClient.extractTimestamp("myLabel", null, map); - } - - @Test - public void timestampLabelParsesMillisecondsSinceEpoch() { - long time = 1446162101123L; - Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time)); - long timestamp = PubsubClient.extractTimestamp("myLabel", null, map); - assertEquals(time, timestamp); - } - - @Test - public void timestampLabelParsesRfc3339Seconds() { - roundTripRfc339("2015-10-29T23:41:41Z"); - } - - @Test - public void timestampLabelParsesRfc3339Tenths() { - roundTripRfc339("2015-10-29T23:41:41.1Z"); - } - - @Test - public void timestampLabelParsesRfc3339Hundredths() { - roundTripRfc339("2015-10-29T23:41:41.12Z"); - } - - @Test - public void timestampLabelParsesRfc3339Millis() { - roundTripRfc339("2015-10-29T23:41:41.123Z"); - } - - @Test - public void timestampLabelParsesRfc3339Micros() { - // Note: micros part 456/1000 is dropped. - truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z"); - } - - @Test - public void timestampLabelParsesRfc3339MicrosRounding() { - // Note: micros part 999/1000 is dropped, not rounded up. - truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z"); - } - - @Test - public void timestampLabelWithInvalidFormatThrowsError() { - thrown.expect(NumberFormatException.class); - parse("not-a-timestamp"); - } - - @Test - public void timestampLabelWithInvalidFormat2ThrowsError() { - thrown.expect(NumberFormatException.class); - parse("null"); - } - - @Test - public void timestampLabelWithInvalidFormat3ThrowsError() { - thrown.expect(NumberFormatException.class); - parse("2015-10"); - } - - @Test - public void timestampLabelParsesRfc3339WithSmallYear() { - // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted - // This is therefore a "small year" until this difference is reconciled. - roundTripRfc339("1582-10-15T01:23:45.123Z"); - } - - @Test - public void timestampLabelParsesRfc3339WithLargeYear() { - // Year 9999 in range. - roundTripRfc339("9999-10-29T23:41:41.123999Z"); - } - - @Test - public void timestampLabelRfc3339WithTooLargeYearThrowsError() { - thrown.expect(NumberFormatException.class); - // Year 10000 out of range. - parse("10000-10-29T23:41:41.123999Z"); - } - - // - // Paths - // - - @Test - public void projectPathFromIdWellFormed() { - ProjectPath path = PubsubClient.projectPathFromId("test"); - assertEquals("projects/test", path.getPath()); - } - - @Test - public void subscriptionPathFromNameWellFormed() { - SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); - assertEquals("projects/test/subscriptions/something", path.getPath()); - assertEquals("/subscriptions/test/something", path.getV1Beta1Path()); - } - - @Test - public void topicPathFromNameWellFormed() { - TopicPath path = PubsubClient.topicPathFromName("test", "something"); - assertEquals("projects/test/topics/something", path.getPath()); - assertEquals("/topics/test/something", path.getV1Beta1Path()); - } -}
