This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch revert-10331-master in repository https://gitbox.apache.org/repos/asf/beam.git
commit aeae2f417629c374cb025faa3a664a9646859a01 Author: Boyuan Zhang <[email protected]> AuthorDate: Fri Dec 27 10:39:26 2019 -0800 Revert "[BEAM-8932] Modify PubsubClient to use the proto message throughout." --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 +- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 139 ++++++++++++++++----- .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 29 +++-- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 9 +- .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 28 ++--- .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java | 11 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 23 ++-- .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 33 +++-- .../apache/beam/sdk/io/gcp/pubsub/TestPubsub.java | 20 +-- .../beam/sdk/io/gcp/pubsub/TestPubsubSignal.java | 6 +- .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 1 - .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 19 ++- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 8 +- .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 46 ++----- .../sdk/io/gcp/pubsub/PubsubTestClientTest.java | 31 ++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 39 ++---- .../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 28 ++--- 17 files changed, 226 insertions(+), 250 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7bdf719..f3ba610 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -365,7 +365,7 @@ class BeamModulePlugin implements Plugin<Project> { def cassandra_driver_version = "3.6.0" def classgraph_version = "4.8.56" def generated_grpc_beta_version = "0.44.0" - def generated_grpc_ga_version = "1.83.0" + def generated_grpc_ga_version = "1.43.0" def generated_grpc_dc_beta_version = "0.27.0-alpha" def google_auth_version = "0.12.0" def google_clients_version = "1.28.0" @@ -384,7 +384,7 @@ class BeamModulePlugin implements Plugin<Project> { def postgres_version = "42.2.2" def powermock_version = "2.0.2" def proto_google_common_protos_version = "1.17.0" - def protobuf_version = "3.11.0" + def protobuf_version = "3.6.0" def quickcheck_version = "0.8" def spark_version = "2.4.4" def spark_structured_streaming_version = "2.4.0" @@ -445,7 +445,7 @@ class BeamModulePlugin implements Plugin<Project> { google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20181114-$google_clients_version", google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20181015-$google_clients_version", google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20190927-$google_clients_version", - google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20191111-$google_clients_version", + google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20181213-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20181109-$google_clients_version", google_auth_library_credentials : "com.google.auth:google-auth-library-credentials:$google_auth_version", google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http:$google_auth_version", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 6f0f54d..07d6da6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -21,12 +21,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.DateTime; -import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -300,37 +298,59 @@ public abstract class PubsubClient implements Closeable { * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java * serialization is never used for non-test clients. */ - @AutoValue - public abstract static class OutgoingMessage implements Serializable { + public static class OutgoingMessage implements Serializable { + /** Underlying (encoded) element. */ + public final byte[] elementBytes; - /** Underlying Message. May not have publish timestamp set. */ - public abstract PubsubMessage message(); + public final Map<String, String> attributes; /** Timestamp for element (ms since epoch). */ - public abstract long timestampMsSinceEpoch(); + public final long timestampMsSinceEpoch; /** * If using an id attribute, the record id to associate with this record's metadata so the * receiver can reject duplicates. Otherwise {@literal null}. */ - @Nullable - public abstract String recordId(); + @Nullable public final String recordId; - public static OutgoingMessage of( - PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { - return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId); - } - - public static OutgoingMessage of( - org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message, + public OutgoingMessage( + byte[] elementBytes, + Map<String, String> attributes, long timestampMsSinceEpoch, @Nullable String recordId) { - PubsubMessage.Builder builder = - PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload())); - if (message.getAttributeMap() != null) { - builder.putAllAttributes(message.getAttributeMap()); + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.recordId = recordId; + } + + @Override + public String toString() { + return String.format( + "OutgoingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; } - return of(builder.build(), timestampMsSinceEpoch, recordId); + + OutgoingMessage that = (OutgoingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes) + && Objects.equal(recordId, that.recordId); + } + + @Override + public int hashCode() { + return Objects.hashCode( + Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, recordId); } } @@ -340,35 +360,86 @@ public abstract class PubsubClient implements Closeable { * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. Java * serialization is never used for non-test clients. */ - @AutoValue - abstract static class IncomingMessage implements Serializable { + static class IncomingMessage implements Serializable { + /** Underlying (encoded) element. */ + public final byte[] elementBytes; - /** Underlying Message. */ - public abstract PubsubMessage message(); + public Map<String, String> attributes; /** * Timestamp for element (ms since epoch). Either Pubsub's processing time, or the custom * timestamp associated with the message. */ - public abstract long timestampMsSinceEpoch(); + public final long timestampMsSinceEpoch; /** Timestamp (in system time) at which we requested the message (ms since epoch). */ - public abstract long requestTimeMsSinceEpoch(); + public final long requestTimeMsSinceEpoch; /** Id to pass back to Pubsub to acknowledge receipt of this message. */ - public abstract String ackId(); + public final String ackId; /** Id to pass to the runner to distinguish this message from all others. */ - public abstract String recordId(); + public final String recordId; - public static IncomingMessage of( - PubsubMessage message, + public IncomingMessage( + byte[] elementBytes, + Map<String, String> attributes, long timestampMsSinceEpoch, long requestTimeMsSinceEpoch, String ackId, String recordId) { - return new AutoValue_PubsubClient_IncomingMessage( - message, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId); + this.elementBytes = elementBytes; + this.attributes = attributes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackId = ackId; + this.recordId = recordId; + } + + public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { + return new IncomingMessage( + elementBytes, + attributes, + timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, + recordId); + } + + @Override + public String toString() { + return String.format( + "IncomingMessage(%db, %dms)", elementBytes.length, timestampMsSinceEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncomingMessage that = (IncomingMessage) o; + + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch + && ackId.equals(that.ackId) + && recordId.equals(that.recordId) + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(attributes, that.attributes); + } + + @Override + public int hashCode() { + return Objects.hashCode( + Arrays.hashCode(elementBytes), + attributes, + timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, + recordId); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index a3b6b8d..ae3fa02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auth.Credentials; +import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -212,15 +213,21 @@ public class PubsubGrpcClient extends PubsubClient { public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath()); for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage.Builder message = outgoingMessage.message().toBuilder(); + PubsubMessage.Builder message = + PubsubMessage.newBuilder().setData(ByteString.copyFrom(outgoingMessage.elementBytes)); + + if (outgoingMessage.attributes != null) { + message.putAllAttributes(outgoingMessage.attributes); + } if (timestampAttribute != null) { - message.putAttributes( - timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch())); + message + .getMutableAttributes() + .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); } - if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) { - message.putAttributes(idAttribute, outgoingMessage.recordId()); + if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId); } request.addMessages(message); @@ -252,6 +259,9 @@ public class PubsubGrpcClient extends PubsubClient { PubsubMessage pubsubMessage = message.getMessage(); @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); + // Payload. + byte[] elementBytes = pubsubMessage.getData().toByteArray(); + // Timestamp. String pubsubTimestampString = null; Timestamp timestampProto = pubsubMessage.getPublishTime(); @@ -277,8 +287,13 @@ public class PubsubGrpcClient extends PubsubClient { } incomingMessages.add( - IncomingMessage.of( - pubsubMessage, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, recordId)); + new IncomingMessage( + elementBytes, + attributes, + timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, + recordId)); } return incomingMessages; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 5f6d044..da5266f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1303,14 +1303,7 @@ public class PubsubIO { } // NOTE: The record id is always null. - output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), - c.timestamp().getMillis(), - null)); + output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); currentOutputBytes += payload.length; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 7f3c771..136b1d2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -39,7 +39,6 @@ import com.google.api.services.pubsub.model.Topic; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -124,12 +123,8 @@ public class PubsubJsonClient extends PubsubClient { public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage pubsubMessage = - new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray()); + PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage)); - if (!outgoingMessage.message().getOrderingKey().isEmpty()) { - pubsubMessage.put("orderingKey", outgoingMessage.message().getOrderingKey()); - } pubsubMessages.add(pubsubMessage); } PublishRequest request = new PublishRequest().setMessages(pubsubMessages); @@ -140,16 +135,16 @@ public class PubsubJsonClient extends PubsubClient { private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) { Map<String, String> attributes = null; - if (outgoingMessage.message().getAttributesMap() == null) { + if (outgoingMessage.attributes == null) { attributes = new TreeMap<>(); } else { - attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap()); + attributes = new TreeMap<>(outgoingMessage.attributes); } if (timestampAttribute != null) { - attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch())); + attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); } - if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) { - attributes.put(idAttribute, outgoingMessage.recordId()); + if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idAttribute, outgoingMessage.recordId); } return attributes; } @@ -197,15 +192,10 @@ public class PubsubJsonClient extends PubsubClient { recordId = pubsubMessage.getMessageId(); } - com.google.pubsub.v1.PubsubMessage.Builder protoMessage = - com.google.pubsub.v1.PubsubMessage.newBuilder(); - protoMessage.setData(ByteString.copyFrom(elementBytes)); - protoMessage.putAllAttributes(attributes); - protoMessage.setOrderingKey( - (String) pubsubMessage.getUnknownKeys().getOrDefault("orderingKey", "")); incomingMessages.add( - IncomingMessage.of( - protoMessage.build(), + new IncomingMessage( + elementBytes, + attributes, timestampMsSinceEpoch, requestTimeMsSinceEpoch, ackId, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index c3b915d..6b20b56 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -309,17 +309,12 @@ public class PubsubTestClient extends PubsubClient implements Serializable { IncomingMessage incomingMessage = pendItr.next(); pendItr.remove(); IncomingMessage incomingMessageWithRequestTime = - IncomingMessage.of( - incomingMessage.message(), - incomingMessage.timestampMsSinceEpoch(), - requestTimeMsSinceEpoch, - incomingMessage.ackId(), - incomingMessage.recordId()); + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); incomingMessages.add(incomingMessageWithRequestTime); STATE.pendingAckIncomingMessages.put( - incomingMessageWithRequestTime.ackId(), incomingMessageWithRequestTime); + incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime); STATE.ackDeadline.put( - incomingMessageWithRequestTime.ackId(), + incomingMessageWithRequestTime.ackId, requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); if (incomingMessages.size() >= batchSize) { break; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 8be8c56..1258d0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -30,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; @@ -101,18 +101,19 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public void encode(OutgoingMessage value, OutputStream outStream) throws CoderException, IOException { - ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); - RECORD_ID_CODER.encode(value.recordId(), outStream); + ByteArrayCoder.of().encode(value.elementBytes, outStream); + ATTRIBUTES_CODER.encode(value.attributes, outStream); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream); + RECORD_ID_CODER.encode(value.recordId, outStream); } @Override public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException { - com.google.pubsub.v1.PubsubMessage message = - ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream); + byte[] elementBytes = ByteArrayCoder.of().decode(inStream); + Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream); long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); @Nullable String recordId = RECORD_ID_CODER.decode(inStream); - return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId); + return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId); } } @@ -153,6 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, elementCounter.inc(); PubsubMessage message = c.element(); byte[] elementBytes = message.getPayload(); + Map<String, String> attributes = message.getAttributeMap(); long timestampMsSinceEpoch = c.timestamp().getMillis(); @Nullable String recordId = null; @@ -173,7 +175,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, c.output( KV.of( ThreadLocalRandom.current().nextInt(numShards), - OutgoingMessage.of(message, timestampMsSinceEpoch, recordId))); + new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId))); } @Override @@ -244,8 +246,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize); int bytes = 0; for (OutgoingMessage message : c.element().getValue()) { - if (!pubsubMessages.isEmpty() - && bytes + message.message().getData().size() > publishBatchBytes) { + if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > publishBatchBytes) { // Break large (in bytes) batches into smaller. // (We've already broken by batch size using the trigger below, though that may // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since @@ -256,7 +257,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, bytes = 0; } pubsubMessages.add(message); - bytes += message.message().getData().size(); + bytes += message.elementBytes.length; } if (!pubsubMessages.isEmpty()) { // BLOCKS until published. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 230161c..d8abfe1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -727,18 +727,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub // Capture the received messages. for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { notYetRead.add(incomingMessage); - notYetReadBytes += incomingMessage.message().getData().size(); + notYetReadBytes += incomingMessage.elementBytes.length; inFlight.put( - incomingMessage.ackId(), + incomingMessage.ackId, new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch)); numReceived++; numReceivedRecently.add(requestTimeMsSinceEpoch, 1L); minReceivedTimestampMsSinceEpoch.add( - requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch()); + requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch); maxReceivedTimestampMsSinceEpoch.add( - requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch()); + requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch); minUnreadTimestampMsSinceEpoch.add( - requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch()); + requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch); } } @@ -837,7 +837,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub if (current != null) { // Current is consumed. It can no longer contribute to holding back the watermark. - minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch()); + minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch); current = null; } @@ -864,18 +864,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub // Try again later. return false; } - notYetReadBytes -= current.message().getData().size(); + notYetReadBytes -= current.elementBytes.length; checkState(notYetReadBytes >= 0); long nowMsSinceEpoch = now(); - numReadBytes.add(nowMsSinceEpoch, current.message().getData().size()); - minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch()); - if (current.timestampMsSinceEpoch() < lastWatermarkMsSinceEpoch) { + numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length); + minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch); + if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) { numLateMessages.add(nowMsSinceEpoch, 1L); } // Current message can be considered 'read' and will be persisted by the next // checkpoint. So it is now safe to ACK back to Pubsub. - safeToAckIds.add(current.ackId()); + safeToAckIds.add(current.ackId); return true; } @@ -884,10 +884,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub if (current == null) { throw new NoSuchElementException(); } - return new PubsubMessage( - current.message().getData().toByteArray(), - current.message().getAttributesMap(), - current.recordId()); + return new PubsubMessage(current.elementBytes, current.attributes, current.recordId); } @Override @@ -895,7 +892,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub if (current == null) { throw new NoSuchElementException(); } - return new Instant(current.timestampMsSinceEpoch()); + return new Instant(current.timestampMsSinceEpoch); } @Override @@ -903,7 +900,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub if (current == null) { throw new NoSuchElementException(); } - return current.recordId().getBytes(StandardCharsets.UTF_8); + return current.recordId.getBytes(StandardCharsets.UTF_8); } /** @@ -987,7 +984,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { - snapshotNotYetReadIds.add(incomingMessage.ackId()); + snapshotNotYetReadIds.add(incomingMessage.ackId); } if (outer.subscriptionPath == null) { // need to include the subscription in case we resume, as it's not stored in the source. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java index e1e8711..1e75d43 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java @@ -22,14 +22,12 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.projectPathFromPath import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; @@ -207,16 +205,11 @@ public class TestPubsub implements TestRule { if (!messages.isEmpty()) { pubsub.acknowledge( subscriptionPath, - messages.stream().map(IncomingMessage::ackId).collect(ImmutableList.toImmutableList())); + messages.stream().map(msg -> msg.ackId).collect(ImmutableList.toImmutableList())); } return messages.stream() - .map( - msg -> - new PubsubMessage( - msg.message().getData().toByteArray(), - msg.message().getAttributesMap(), - msg.recordId())) + .map(msg -> new PubsubMessage(msg.elementBytes, msg.attributes, msg.recordId)) .collect(ImmutableList.toImmutableList()); } @@ -299,12 +292,7 @@ public class TestPubsub implements TestRule { } private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) { - return PubsubClient.OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(message.getPayload())) - .putAllAttributes(message.getAttributeMap()) - .build(), - DateTime.now().getMillis(), - null); + return new PubsubClient.OutgoingMessage( + message.getPayload(), message.getAttributeMap(), DateTime.now().getMillis(), null); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java index de4a715..f4f8b18 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; @@ -29,7 +30,6 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.state.BagState; @@ -251,7 +251,7 @@ public class TestPubsubSignal implements TestRule { try { signal = pubsub.pull(DateTime.now().getMillis(), signalSubscriptionPath, 1, false); pubsub.acknowledge( - signalSubscriptionPath, signal.stream().map(IncomingMessage::ackId).collect(toList())); + signalSubscriptionPath, signal.stream().map(m -> m.ackId).collect(toList())); break; } catch (StatusRuntimeException e) { if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) { @@ -271,7 +271,7 @@ public class TestPubsubSignal implements TestRule { signalSubscriptionPath, duration.getStandardSeconds())); } - return signal.get(0).message().getData().toStringUtf8(); + return new String(signal.get(0).elementBytes, UTF_8); } private void sleep(long t) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index bc146ce..50f6548 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -67,7 +67,6 @@ public class GcpApiSurfaceTest { classesInPackage("com.google.cloud.bigtable.config"), classesInPackage("com.google.cloud.bigtable.data"), classesInPackage("com.google.spanner.v1"), - classesInPackage("com.google.pubsub.v1"), Matchers.equalTo(com.google.api.gax.rpc.ApiException.class), Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationFuture.class), Matchers.<Class<?>>equalTo(com.google.api.gax.longrunning.OperationSnapshot.class), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 4dd719b..7c53170 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -142,11 +142,11 @@ public class PubsubGrpcClientTest { List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); assertEquals(1, acutalMessages.size()); IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId()); - assertEquals(DATA, actualMessage.message().getData().toStringUtf8()); - assertEquals(RECORD_ID, actualMessage.recordId()); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch()); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch()); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); } finally { server.shutdownNow(); @@ -187,13 +187,8 @@ public class PubsubGrpcClientTest { InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start(); try { OutgoingMessage actualMessage = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .putAllAttributes(ATTRIBUTES) - .build(), - MESSAGE_TIME, - RECORD_ID); + new OutgoingMessage( + DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 0dc910f..65b89a7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import com.google.api.client.util.Clock; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -392,10 +391,9 @@ public class PubsubIOTest { }) .map( ba -> - IncomingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(ba)) - .build(), + new IncomingMessage( + ba, + null, 1234L, 0, UUID.randomUUID().toString(), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 434f0a3..f7fc0f3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse; import com.google.api.services.pubsub.model.ReceivedMessage; import com.google.api.services.pubsub.model.Subscription; import com.google.api.services.pubsub.model.Topic; -import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -74,7 +73,6 @@ public class PubsubJsonClientTest { private static final String DATA = "testData"; private static final String RECORD_ID = "testRecordId"; private static final String ACK_ID = "testAckId"; - private static final String ORDERING_KEY = "testOrderingKey"; @Before public void setup() { @@ -100,8 +98,7 @@ public class PubsubJsonClientTest { .setPublishTime(String.valueOf(PUB_TIME)) .setAttributes( ImmutableMap.of( - TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID)) - .set("orderingKey", ORDERING_KEY); + TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID)); ReceivedMessage expectedReceivedMessage = new ReceivedMessage().setMessage(expectedPubsubMessage).setAckId(ACK_ID); PullResponse expectedResponse = @@ -116,12 +113,11 @@ public class PubsubJsonClientTest { List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); assertEquals(1, acutalMessages.size()); IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId()); - assertEquals(DATA, actualMessage.message().getData().toStringUtf8()); - assertEquals(RECORD_ID, actualMessage.recordId()); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch()); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch()); - assertEquals(ORDERING_KEY, actualMessage.message().getOrderingKey()); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes, StandardCharsets.UTF_8)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); } @Test @@ -150,7 +146,7 @@ public class PubsubJsonClientTest { List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); assertEquals(1, acutalMessages.size()); IncomingMessage actualMessage = acutalMessages.get(0); - assertArrayEquals(new byte[0], actualMessage.message().getData().toByteArray()); + assertArrayEquals(new byte[0], actualMessage.elementBytes); } @Test @@ -164,8 +160,7 @@ public class PubsubJsonClientTest { .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME)) .put(ID_ATTRIBUTE, RECORD_ID) .put("k", "v") - .build()) - .set("orderingKey", ORDERING_KEY); + .build()); PublishRequest expectedRequest = new PublishRequest().setMessages(ImmutableList.of(expectedPubsubMessage)); PublishResponse expectedResponse = @@ -176,14 +171,7 @@ public class PubsubJsonClientTest { Map<String, String> attrs = new HashMap<>(); attrs.put("k", "v"); OutgoingMessage actualMessage = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .putAllAttributes(attrs) - .setOrderingKey(ORDERING_KEY) - .build(), - MESSAGE_TIME, - RECORD_ID); + new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } @@ -207,12 +195,8 @@ public class PubsubJsonClientTest { (mockPubsub.projects().topics().publish(expectedTopic, expectedRequest).execute())) .thenReturn(expectedResponse); OutgoingMessage actualMessage = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .build(), - MESSAGE_TIME, - RECORD_ID); + new OutgoingMessage( + DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } @@ -238,13 +222,7 @@ public class PubsubJsonClientTest { Map<String, String> attrs = new HashMap<>(); attrs.put("k", "v"); OutgoingMessage actualMessage = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .putAllAttributes(attrs) - .build(), - MESSAGE_TIME, - RECORD_ID); + new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), attrs, MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java index 6b920e8..2b698f0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -20,9 +20,8 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; import com.google.api.client.util.Clock; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; @@ -55,8 +54,9 @@ public class PubsubTestClientTest { final AtomicLong now = new AtomicLong(); Clock clock = now::get; IncomingMessage expectedIncomingMessage = - IncomingMessage.of( - PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(), + new IncomingMessage( + DATA.getBytes(StandardCharsets.UTF_8), + null, MESSAGE_TIME, REQ_TIME, ACK_ID, @@ -75,14 +75,7 @@ public class PubsubTestClientTest { client.advance(); incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); assertEquals(1, incomingMessages.size()); - assertEquals( - IncomingMessage.of( - expectedIncomingMessage.message(), - expectedIncomingMessage.timestampMsSinceEpoch(), - now.get(), - expectedIncomingMessage.ackId(), - expectedIncomingMessage.recordId()), - incomingMessages.get(0)); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); now.addAndGet(10 * 1000); client.advance(); // Extend ack @@ -92,14 +85,7 @@ public class PubsubTestClientTest { client.advance(); incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); assertEquals(1, incomingMessages.size()); - assertEquals( - IncomingMessage.of( - expectedIncomingMessage.message(), - expectedIncomingMessage.timestampMsSinceEpoch(), - now.get(), - expectedIncomingMessage.ackId(), - expectedIncomingMessage.recordId()), - incomingMessages.get(0)); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); // Extend ack client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); // Ack @@ -113,10 +99,7 @@ public class PubsubTestClientTest { @Test public void publishOneMessage() throws IOException { OutgoingMessage expectedOutgoingMessage = - OutgoingMessage.of( - PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(), - MESSAGE_TIME, - MESSAGE_ID); + new OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_TIME, MESSAGE_ID); try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish( TOPIC, Sets.newHashSet(expectedOutgoingMessage), ImmutableList.of())) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f8cd86e..f588e05 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -84,12 +83,8 @@ public class PubsubUnboundedSinkTest implements Serializable { @Test public void saneCoder() throws Exception { OutgoingMessage message = - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .build(), - TIMESTAMP, - getRecordId(DATA)); + new OutgoingMessage( + DATA.getBytes(StandardCharsets.UTF_8), ImmutableMap.of(), TIMESTAMP, getRecordId(DATA)); CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); } @@ -98,13 +93,8 @@ public class PubsubUnboundedSinkTest implements Serializable { public void sendOneMessage() throws IOException { List<OutgoingMessage> outgoing = ImmutableList.of( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .putAllAttributes(ATTRIBUTES) - .build(), - TIMESTAMP, - getRecordId(DATA))); + new OutgoingMessage( + DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, TIMESTAMP, getRecordId(DATA))); int batchSize = 1; int batchBytes = 1; try (PubsubTestClientFactory factory = @@ -131,10 +121,9 @@ public class PubsubUnboundedSinkTest implements Serializable { public void sendOneMessageWithoutAttributes() throws IOException { List<OutgoingMessage> outgoing = ImmutableList.of( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .build(), + new OutgoingMessage( + DATA.getBytes(StandardCharsets.UTF_8), + null /* attributes */, TIMESTAMP, getRecordId(DATA))); try (PubsubTestClientFactory factory = @@ -168,10 +157,9 @@ public class PubsubUnboundedSinkTest implements Serializable { for (int i = 0; i < batchSize * 10; i++) { String str = String.valueOf(i); outgoing.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(str)) - .build(), + new OutgoingMessage( + str.getBytes(StandardCharsets.UTF_8), + ImmutableMap.of(), TIMESTAMP, getRecordId(str))); data.add(str); @@ -210,10 +198,9 @@ public class PubsubUnboundedSinkTest implements Serializable { } String str = sb.toString(); outgoing.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(str)) - .build(), + new OutgoingMessage( + str.getBytes(StandardCharsets.UTF_8), + ImmutableMap.of(), TIMESTAMP, getRecordId(str))); data.add(str); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 43ecbdc..b2dacf0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -31,7 +31,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.api.client.util.Clock; -import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -101,14 +100,8 @@ public class PubsubUnboundedSourceTest { private void setupOneMessage() { setupOneMessage( ImmutableList.of( - IncomingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(DATA)) - .build(), - TIMESTAMP, - 0, - ACK_ID, - RECORD_ID))); + new IncomingMessage( + DATA.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ACK_ID, RECORD_ID))); } @After @@ -226,14 +219,8 @@ public class PubsubUnboundedSourceTest { String data = String.format("data_%d", i); String ackid = String.format("ackid_%d", i); incoming.add( - IncomingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(data)) - .build(), - TIMESTAMP, - 0, - ackid, - RECORD_ID)); + new IncomingMessage( + data.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ackid, RECORD_ID)); } setupOneMessage(incoming); PubsubReader reader = primSource.createReader(p.getOptions(), null); @@ -292,10 +279,9 @@ public class PubsubUnboundedSourceTest { String recid = String.format("recordid_%d", messageNum); String ackId = String.format("ackid_%d", messageNum); incoming.add( - IncomingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(data)) - .build(), + new IncomingMessage( + data.getBytes(StandardCharsets.UTF_8), + null, messageNumToTimestamp(messageNum), 0, ackId,
