Repository: beam
Updated Branches:
  refs/heads/master f5e3f5230 -> 14d60b26e


Renames {id,timestamp}Label to {id,timestamp}Attribute throughout SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8853d53d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8853d53d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8853d53d

Branch: refs/heads/master
Commit: 8853d53d9ffdf6e68c80880f6dd5f2d11a6e451e
Parents: f065114
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Apr 27 17:19:14 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Sat Apr 29 13:15:48 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../examples/complete/game/LeaderBoard.java     |  2 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 18 ++---
 .../org/apache/beam/sdk/util/PropertyNames.java |  4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java    | 42 ++++++-----
 .../sdk/io/gcp/pubsub/PubsubGrpcClient.java     | 36 +++++-----
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 74 ++++++++++----------
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     | 36 +++++-----
 .../sdk/io/gcp/pubsub/PubsubTestClient.java     |  6 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  | 58 ++++++++-------
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 61 +++++++++-------
 .../sdk/io/gcp/pubsub/PubsubClientTest.java     | 50 ++++++-------
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 16 +++--
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    | 24 +++----
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 14 ++--
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  | 10 +--
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   |  6 +-
 17 files changed, 238 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d628497..a46d3c5 100644
--- 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -252,7 +252,7 @@ public class GameStats extends LeaderBoard {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
         .apply(PubsubIO.readStrings()
-            
.withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+            
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index fbffac6..9af34c5 100644
--- 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -191,7 +191,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
         .apply(PubsubIO.readStrings()
-            
.withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+            
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     gameEvents.apply("CalculateTeamScores",

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 63c2191..a61fe49 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -922,12 +922,13 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
               ((NestedValueProvider) 
overriddenTransform.getSubscriptionProvider()).propertyName());
         }
       }
-      if (overriddenTransform.getTimestampLabel() != null) {
+      if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
-            PropertyNames.PUBSUB_TIMESTAMP_LABEL, 
overriddenTransform.getTimestampLabel());
+            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, 
overriddenTransform.getTimestampAttribute());
       }
-      if (overriddenTransform.getIdLabel() != null) {
-        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
+      if (overriddenTransform.getIdAttribute() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_ID_ATTRIBUTE, 
overriddenTransform.getIdAttribute());
       }
       if (overriddenTransform.getWithAttributesParseFn() != null) {
         stepContext.addInput(
@@ -997,12 +998,13 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
             PropertyNames.PUBSUB_TOPIC_OVERRIDE,
             ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
       }
-      if (overriddenTransform.getTimestampLabel() != null) {
+      if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
-            PropertyNames.PUBSUB_TIMESTAMP_LABEL, 
overriddenTransform.getTimestampLabel());
+            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, 
overriddenTransform.getTimestampAttribute());
       }
-      if (overriddenTransform.getIdLabel() != null) {
-        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
+      if (overriddenTransform.getIdAttribute() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_ID_ATTRIBUTE, 
overriddenTransform.getIdAttribute());
       }
       if (overriddenTransform.getFormatFn() != null) {
         stepContext.addInput(

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index ee25448..aa5855b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -82,11 +82,11 @@ public class PropertyNames {
   public static final String OUTPUT_NAME = "output_name";
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PHASE = "phase";
-  public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+  public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
   public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = 
"pubsub_serialized_attributes_fn";
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
   public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = 
"pubsub_subscription_runtime_override";
-  public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
+  public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = 
"pubsub_timestamp_label";
   public static final String PUBSUB_TOPIC = "pubsub_topic";
   public static final String PUBSUB_TOPIC_OVERRIDE = 
"pubsub_topic_runtime_override";
   public static final String SCALAR_FIELD_NAME = "value";

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 3a69799..cfe36ee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -42,16 +42,15 @@ abstract class PubsubClient implements Closeable {
    */
   public interface PubsubClientFactory extends Serializable {
     /**
-     * Construct a new Pubsub client. It should be closed via {@link #close} 
in order
-     * to ensure tidy cleanup of underlying netty resources (or use the 
try-with-resources
-     * construct). Uses {@code options} to derive pubsub endpoints and 
application credentials.
-     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} 
to store custom
-     * timestamps/ids within message metadata.
+     * Construct a new Pubsub client. It should be closed via {@link #close} 
in order to ensure tidy
+     * cleanup of underlying netty resources (or use the try-with-resources 
construct). Uses {@code
+     * options} to derive pubsub endpoints and application credentials. If 
non-{@literal null}, use
+     * {@code timestampAttribute} and {@code idAttribute} to store custom 
timestamps/ids within
+     * message metadata.
      */
     PubsubClient newClient(
-        @Nullable String timestampLabel,
-        @Nullable String idLabel,
-        PubsubOptions options) throws IOException;
+        @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
+        throws IOException;
 
     /**
      * Return the display name for this factory. Eg "Json", "gRPC".
@@ -86,33 +85,33 @@ abstract class PubsubClient implements Closeable {
    * Return the timestamp (in ms since unix epoch) to use for a Pubsub message 
with {@code
    * attributes} and {@code pubsubTimestamp}.
    *
-   * <p>If {@code timestampLabel} is non-{@literal null} then the message 
attributes must contain
-   * that label, and the value of that label will be taken as the timestamp.
+   * <p>If {@code timestampAttribute} is non-{@literal null} then the message 
attributes must
+   * contain that attribute, and the value of that attribute will be taken as 
the timestamp.
    * Otherwise the timestamp will be taken from the Pubsub publish timestamp 
{@code
    * pubsubTimestamp}.
    *
    * @throws IllegalArgumentException if the timestamp cannot be recognized as 
a ms-since-unix-epoch
-   * or RFC3339 time.
+   *     or RFC3339 time.
    */
   protected static long extractTimestamp(
-      @Nullable String timestampLabel,
+      @Nullable String timestampAttribute,
       @Nullable String pubsubTimestamp,
       @Nullable Map<String, String> attributes) {
     Long timestampMsSinceEpoch;
-    if (Strings.isNullOrEmpty(timestampLabel)) {
+    if (Strings.isNullOrEmpty(timestampAttribute)) {
       timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
       checkArgument(timestampMsSinceEpoch != null,
                     "Cannot interpret PubSub publish timestamp: %s",
                     pubsubTimestamp);
     } else {
-      String value = attributes == null ? null : 
attributes.get(timestampLabel);
+      String value = attributes == null ? null : 
attributes.get(timestampAttribute);
       checkArgument(value != null,
-                    "PubSub message is missing a value for timestamp label %s",
-                    timestampLabel);
+                    "PubSub message is missing a value for timestamp attribute 
%s",
+                    timestampAttribute);
       timestampMsSinceEpoch = asMsSinceEpoch(value);
       checkArgument(timestampMsSinceEpoch != null,
-                    "Cannot interpret value of label %s as timestamp: %s",
-                    timestampLabel, value);
+                    "Cannot interpret value of attribute %s as timestamp: %s",
+                    timestampAttribute, value);
     }
     return timestampMsSinceEpoch;
   }
@@ -317,11 +316,10 @@ abstract class PubsubClient implements Closeable {
     public final long timestampMsSinceEpoch;
 
     /**
-     * If using an id label, the record id to associate with this record's 
metadata so the receiver
-     * can reject duplicates. Otherwise {@literal null}.
+     * If using an id attribute, the record id to associate with this record's 
metadata so the
+     * receiver can reject duplicates. Otherwise {@literal null}.
      */
-    @Nullable
-    public final String recordId;
+    @Nullable public final String recordId;
 
     public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
                            long timestampMsSinceEpoch, @Nullable String 
recordId) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 16de648..9778edf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -79,7 +79,7 @@ class PubsubGrpcClient extends PubsubClient {
   private static class PubsubGrpcClientFactory implements PubsubClientFactory {
     @Override
     public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+        @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
         throws IOException {
       ManagedChannel channel = NettyChannelBuilder
           .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
@@ -87,8 +87,8 @@ class PubsubGrpcClient extends PubsubClient {
           .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
           .build();
 
-      return new PubsubGrpcClient(timestampLabel,
-                                  idLabel,
+      return new PubsubGrpcClient(timestampAttribute,
+                                  idAttribute,
                                   DEFAULT_TIMEOUT_S,
                                   channel,
                                   options.getGcpCredential());
@@ -122,17 +122,17 @@ class PubsubGrpcClient extends PubsubClient {
   private final Credentials credentials;
 
   /**
-   * Label to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
+   * Attribute to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
    * instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
+   * Attribute to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
 
   /**
@@ -144,13 +144,13 @@ class PubsubGrpcClient extends PubsubClient {
 
   @VisibleForTesting
   PubsubGrpcClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       int timeoutSec,
       ManagedChannel publisherChannel,
       Credentials credentials) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.timeoutSec = timeoutSec;
     this.publisherChannel = publisherChannel;
     this.credentials = credentials;
@@ -226,13 +226,13 @@ class PubsubGrpcClient extends PubsubClient {
         message.putAllAttributes(outgoingMessage.attributes);
       }
 
-      if (timestampLabel != null) {
+      if (timestampAttribute != null) {
         message.getMutableAttributes()
-               .put(timestampLabel, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+               .put(timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) 
{
-        message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+      if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        message.getMutableAttributes().put(idAttribute, 
outgoingMessage.recordId);
       }
 
       request.addMessages(message);
@@ -273,7 +273,7 @@ class PubsubGrpcClient extends PubsubClient {
                                                + timestampProto.getNanos() / 
1000L);
       }
       long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+          extractTimestamp(timestampAttribute, pubsubTimestampString, 
attributes);
 
       // Ack id.
       String ackId = message.getAckId();
@@ -281,8 +281,8 @@ class PubsubGrpcClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
+      if (idAttribute != null && attributes != null) {
+        recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
         // Fall back to the Pubsub provided message id.

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3a7522e..129a25f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -136,12 +136,12 @@ public class PubsubIO {
    * Populate common {@link DisplayData} between Pubsub source and sink.
    */
   private static void populateCommonDisplayData(DisplayData.Builder builder,
-      String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) 
{
+      String timestampAttribute, String idAttribute, 
ValueProvider<PubsubTopic> topic) {
     builder
-        .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
-            .withLabel("Timestamp Label Attribute"))
-        .addIfNotNull(DisplayData.item("idLabel", idLabel)
-            .withLabel("ID Label Attribute"));
+        .addIfNotNull(DisplayData.item("timestampAttribute", 
timestampAttribute)
+            .withLabel("Timestamp Attribute"))
+        .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
+            .withLabel("ID Attribute"));
 
     if (topic != null) {
       String topicString = topic.isAccessible() ? topic.get().asPath()
@@ -529,11 +529,11 @@ public class PubsubIO {
 
     /** The name of the message attribute to read timestamps from. */
     @Nullable
-    abstract String getTimestampLabel();
+    abstract String getTimestampAttribute();
 
     /** The name of the message attribute to read unique message IDs from. */
     @Nullable
-    abstract String getIdLabel();
+    abstract String getIdAttribute();
 
     /** The coder used to decode each record. */
     @Nullable
@@ -551,9 +551,9 @@ public class PubsubIO {
 
       abstract Builder<T> 
setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
 
-      abstract Builder<T> setTimestampLabel(String timestampLabel);
+      abstract Builder<T> setTimestampAttribute(String timestampAttribute);
 
-      abstract Builder<T> setIdLabel(String idLabel);
+      abstract Builder<T> setIdAttribute(String idAttribute);
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
@@ -633,7 +633,7 @@ public class PubsubIO {
      * (i.e., time units smaller than milliseconds) will be ignored.
      * </ul>
      *
-     * <p>If {@code timestampLabel} is not provided, the system will generate 
record timestamps
+     * <p>If {@code timestampAttribute} is not provided, the system will 
generate record timestamps
      * the first time it sees each record. All windowing will be done relative 
to these
      * timestamps.
      *
@@ -643,12 +643,12 @@ public class PubsubIO {
      * specified with the windowing strategy &ndash; by default it will be 
output immediately.
      *
      * <p>Note that the system can guarantee that no late data will ever be 
seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not 
provided).
+     * timestamps by arrival time (i.e. {@code timestampAttribute} is not 
provided).
      *
      * @see <a href="https://www.ietf.org/rfc/rfc3339.txt";>RFC 3339</a>
      */
-    public Read<T> withTimestampLabel(String timestampLabel) {
-      return toBuilder().setTimestampLabel(timestampLabel).build();
+    public Read<T> withTimestampAttribute(String timestampAttribute) {
+      return toBuilder().setTimestampAttribute(timestampAttribute).build();
     }
 
     /**
@@ -657,11 +657,11 @@ public class PubsubIO {
      * The value of the attribute can be any string that uniquely identifies 
this record.
      *
      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on 
the Pub/Sub stream.
-     * If {@code idLabel} is not provided, Beam cannot guarantee that no 
duplicate data will
+     * If {@code idAttribute} is not provided, Beam cannot guarantee that no 
duplicate data will
      * be delivered, and deduplication of the stream will be strictly best 
effort.
      */
-    public Read<T> withIdLabel(String idLabel) {
-      return toBuilder().setIdLabel(idLabel).build();
+    public Read<T> withIdAttribute(String idAttribute) {
+      return toBuilder().setIdAttribute(idAttribute).build();
     }
 
     /**
@@ -718,8 +718,8 @@ public class PubsubIO {
               topicPath,
               subscriptionPath,
               getCoder(),
-              getTimestampLabel(),
-              getIdLabel(),
+              getTimestampAttribute(),
+              getIdAttribute(),
               getParseFn());
       return input.getPipeline().apply(source);
     }
@@ -727,7 +727,8 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), 
getTopicProvider());
+      populateCommonDisplayData(
+          builder, getTimestampAttribute(), getIdAttribute(), 
getTopicProvider());
 
       if (getSubscriptionProvider() != null) {
         String subscriptionString = getSubscriptionProvider().isAccessible()
@@ -757,11 +758,11 @@ public class PubsubIO {
 
     /** The name of the message attribute to publish message timestamps in. */
     @Nullable
-    abstract String getTimestampLabel();
+    abstract String getTimestampAttribute();
 
     /** The name of the message attribute to publish unique message IDs in. */
     @Nullable
-    abstract String getIdLabel();
+    abstract String getIdAttribute();
 
     /** The input type Coder. */
     @Nullable
@@ -777,9 +778,9 @@ public class PubsubIO {
     abstract static class Builder<T> {
       abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> 
topicProvider);
 
-      abstract Builder<T> setTimestampLabel(String timestampLabel);
+      abstract Builder<T> setTimestampAttribute(String timestampAttribute);
 
-      abstract Builder<T> setIdLabel(String idLabel);
+      abstract Builder<T> setIdAttribute(String idAttribute);
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
@@ -814,23 +815,23 @@ public class PubsubIO {
      * time classes, {@link Instant#Instant(long)} can be used to parse this 
value.
      *
      * <p>If the output from this sink is being read by another Beam pipeline, 
then
-     * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure 
the other source reads
-     * these timestamps from the appropriate attribute.
+     * {@link PubsubIO.Read#withTimestampAttribute(String)} can be used to 
ensure the other source
+     * reads these timestamps from the appropriate attribute.
      */
-    public Write<T> withTimestampLabel(String timestampLabel) {
-      return toBuilder().setTimestampLabel(timestampLabel).build();
+    public Write<T> withTimestampAttribute(String timestampAttribute) {
+      return toBuilder().setTimestampAttribute(timestampAttribute).build();
     }
 
     /**
      * Writes to Pub/Sub, adding each record's unique identifier to the 
published messages in an
      * attribute with the specified name. The value of the attribute is an 
opaque string.
      *
-     * <p>If the the output from this sink is being read by another Beam 
pipeline, then
-     * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* 
the other source reads
+     * <p>If the the output from this sink is being read by another Beam 
pipeline, then {@link
+     * PubsubIO.Read#withIdAttribute(String)} can be used to ensure that* the 
other source reads
      * these unique identifiers from the appropriate attribute.
      */
-    public Write<T> withIdLabel(String idLabel) {
-      return toBuilder().setIdLabel(idLabel).build();
+    public Write<T> withIdAttribute(String idAttribute) {
+      return toBuilder().setIdAttribute(idAttribute).build();
     }
 
     /**
@@ -864,8 +865,8 @@ public class PubsubIO {
               FACTORY,
               NestedValueProvider.of(getTopicProvider(), new 
TopicPathTranslator()),
               getCoder(),
-              getTimestampLabel(),
-              getIdLabel(),
+              getTimestampAttribute(),
+              getIdAttribute(),
               getFormatFn(),
               100 /* numShards */));
       }
@@ -875,7 +876,8 @@ public class PubsubIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), 
getTopicProvider());
+      populateCommonDisplayData(
+          builder, getTimestampAttribute(), getIdAttribute(), 
getTopicProvider());
     }
 
     @Override
@@ -897,9 +899,9 @@ public class PubsubIO {
       @StartBundle
       public void startBundle(Context c) throws IOException {
         this.output = new ArrayList<>();
-        // NOTE: idLabel is ignored.
+        // NOTE: idAttribute is ignored.
         this.pubsubClient =
-            FACTORY.newClient(getTimestampLabel(), null,
+            FACTORY.newClient(getTimestampAttribute(), null,
                 c.getPipelineOptions().as(PubsubOptions.class));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 39184fb..b745422 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -69,7 +69,7 @@ class PubsubJsonClient extends PubsubClient {
 
     @Override
     public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+        @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
         throws IOException {
       Pubsub pubsub = new Builder(
           Transport.getTransport(),
@@ -82,7 +82,7 @@ class PubsubJsonClient extends PubsubClient {
           .setApplicationName(options.getAppName())
           .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
           .build();
-      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+      return new PubsubJsonClient(timestampAttribute, idAttribute, pubsub);
     }
 
     @Override
@@ -97,17 +97,17 @@ class PubsubJsonClient extends PubsubClient {
   public static final PubsubClientFactory FACTORY = new 
PubsubJsonClientFactory();
 
   /**
-   * Label to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
+   * Attribute to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
    * instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
+   * Attribute to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * Underlying JSON transport.
@@ -116,11 +116,11 @@ class PubsubJsonClient extends PubsubClient {
 
   @VisibleForTesting
   PubsubJsonClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       Pubsub pubsub) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.pubsub = pubsub;
   }
 
@@ -137,19 +137,19 @@ class PubsubJsonClient extends PubsubClient {
       PubsubMessage pubsubMessage = new 
PubsubMessage().encodeData(outgoingMessage.elementBytes);
 
       Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+      if ((timestampAttribute != null || idAttribute != null) && attributes == 
null) {
         attributes = new TreeMap<>();
       }
       if (attributes != null) {
         pubsubMessage.setAttributes(attributes);
       }
 
-      if (timestampLabel != null) {
-        attributes.put(timestampLabel, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      if (timestampAttribute != null) {
+        attributes.put(timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
       }
 
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) 
{
-        attributes.put(idLabel, outgoingMessage.recordId);
+      if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        attributes.put(idAttribute, outgoingMessage.recordId);
       }
 
       pubsubMessages.add(pubsubMessage);
@@ -188,7 +188,7 @@ class PubsubJsonClient extends PubsubClient {
 
       // Timestamp.
       long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, 
message.getMessage().getPublishTime(), attributes);
+          extractTimestamp(timestampAttribute, 
message.getMessage().getPublishTime(), attributes);
 
       // Ack id.
       String ackId = message.getAckId();
@@ -196,8 +196,8 @@ class PubsubJsonClient extends PubsubClient {
 
       // Record id, if any.
       @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
+      if (idAttribute != null && attributes != null) {
+        recordId = attributes.get(idAttribute);
       }
       if (Strings.isNullOrEmpty(recordId)) {
         // Fall back to the Pubsub provided message id.

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index 9d40e41..df90597 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -136,7 +136,7 @@ class PubsubTestClient extends PubsubClient implements 
Serializable {
     return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
           throws IOException {
         return new PubsubTestClient();
       }
@@ -182,7 +182,7 @@ class PubsubTestClient extends PubsubClient implements 
Serializable {
     return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
           throws IOException {
         return new PubsubTestClient();
       }
@@ -226,7 +226,7 @@ class PubsubTestClient extends PubsubClient implements 
Serializable {
 
       @Override
       public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+          @Nullable String timestampAttribute, @Nullable String idAttribute, 
PubsubOptions options)
           throws IOException {
         return new PubsubTestClient() {
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 002e979..8d273ba 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -224,8 +224,8 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
       extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
     private final ValueProvider<TopicPath> topic;
-    private final String timestampLabel;
-    private final String idLabel;
+    private final String timestampAttribute;
+    private final String idAttribute;
     private final int publishBatchSize;
     private final int publishBatchBytes;
 
@@ -240,12 +240,16 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
     private final Counter byteCounter = SinkMetrics.bytesWritten();
 
     WriterFn(
-        PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
-        String timestampLabel, String idLabel, int publishBatchSize, int 
publishBatchBytes) {
+        PubsubClientFactory pubsubFactory,
+        ValueProvider<TopicPath> topic,
+        String timestampAttribute,
+        String idAttribute,
+        int publishBatchSize,
+        int publishBatchBytes) {
       this.pubsubFactory = pubsubFactory;
       this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
+      this.timestampAttribute = timestampAttribute;
+      this.idAttribute = idAttribute;
       this.publishBatchSize = publishBatchSize;
       this.publishBatchBytes = publishBatchBytes;
     }
@@ -267,7 +271,7 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
     @StartBundle
     public void startBundle(Context c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior 
finishBundle");
-      pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+      pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute,
                                              
c.getPipelineOptions().as(PubsubOptions.class));
     }
 
@@ -311,8 +315,8 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
             : topic.toString();
       builder.add(DisplayData.item("topic", topicString));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
-      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+      builder.addIfNotNull(DisplayData.item("timestampAttribute", 
timestampAttribute));
+      builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
     }
   }
 
@@ -341,14 +345,14 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
    * Pubsub message publish timestamp instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
    * Pubsub metadata field holding id for each element, or {@literal null} if 
need to generate
    * a unique id ourselves.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * Number of 'shards' to use so that latency in Pubsub publish can be 
hidden. Generally this
@@ -374,7 +378,7 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
   private final Duration maxLatency;
 
   /**
-   * How record ids should be generated for each record (if {@link #idLabel} 
is non-{@literal
+   * How record ids should be generated for each record (if {@link 
#idAttribute} is non-{@literal
    * null}).
    */
   private final RecordIdMethod recordIdMethod;
@@ -390,8 +394,8 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
       PubsubClientFactory pubsubFactory,
       ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
+      String timestampAttribute,
+      String idAttribute,
       int numShards,
       int publishBatchSize,
       int publishBatchBytes,
@@ -401,25 +405,25 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
     this.pubsubFactory = pubsubFactory;
     this.topic = topic;
     this.elementCoder = elementCoder;
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.numShards = numShards;
     this.publishBatchSize = publishBatchSize;
     this.publishBatchBytes = publishBatchBytes;
     this.maxLatency = maxLatency;
     this.formatFn = formatFn;
-    this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : 
recordIdMethod;
+    this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : 
recordIdMethod;
   }
 
   public PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
       ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
+      String timestampAttribute,
+      String idAttribute,
       SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
       int numShards) {
-    this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, 
numShards,
+    this(pubsubFactory, topic, elementCoder, timestampAttribute, idAttribute, 
numShards,
          DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, 
DEFAULT_MAX_LATENCY,
          formatFn, RecordIdMethod.RANDOM);
   }
@@ -439,19 +443,19 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
   }
 
   /**
-   * Get the timestamp label.
+   * Get the timestamp attribute.
    */
   @Nullable
-  public String getTimestampLabel() {
-    return timestampLabel;
+  public String getTimestampAttribute() {
+    return timestampAttribute;
   }
 
   /**
-   * Get the id label.
+   * Get the id attribute.
    */
   @Nullable
-  public String getIdLabel() {
-    return idLabel;
+  public String getIdAttribute() {
+    return idAttribute;
   }
 
   /**
@@ -483,7 +487,7 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
          .apply(GroupByKey.<Integer, OutgoingMessage>create())
          .apply("PubsubUnboundedSink.Writer",
-             ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, 
idLabel,
+             ParDo.of(new WriterFn(pubsubFactory, topic, timestampAttribute, 
idAttribute,
                  publishBatchSize, publishBatchBytes)));
     return PDone.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 6392fd2..903ae41 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -602,7 +602,7 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
       pubsubClient =
           new AtomicReference<>(
               outer.outer.pubsubFactory.newClient(
-                  outer.outer.timestampLabel, outer.outer.idLabel, options));
+                  outer.outer.timestampAttribute, outer.outer.idAttribute, 
options));
       ackTimeoutMs = -1;
       safeToAckIds = new HashSet<>();
       notYetRead = new ArrayDeque<>();
@@ -1207,22 +1207,22 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
     @Nullable
     private final ValueProvider<TopicPath> topic;
     @Nullable
-    private final String timestampLabel;
+    private final String timestampAttribute;
     @Nullable
-    private final String idLabel;
+    private final String idAttribute;
 
     public StatsFn(
         PubsubClientFactory pubsubFactory,
         @Nullable ValueProvider<SubscriptionPath> subscription,
         @Nullable ValueProvider<TopicPath> topic,
-        @Nullable String timestampLabel,
-        @Nullable String idLabel) {
+        @Nullable String timestampAttribute,
+        @Nullable String idAttribute) {
       checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
       this.pubsubFactory = pubsubFactory;
       this.subscription = subscription;
       this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
+      this.timestampAttribute = timestampAttribute;
+      this.idAttribute = idAttribute;
     }
 
     @ProcessElement
@@ -1247,8 +1247,8 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
         builder.add(DisplayData.item("topic", topicString));
       }
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
-      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+      builder.addIfNotNull(DisplayData.item("timestampAttribute", 
timestampAttribute));
+      builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
     }
   }
 
@@ -1303,14 +1303,14 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
    * Pubsub message publish timestamp instead.
    */
   @Nullable
-  private final String timestampLabel;
+  private final String timestampAttribute;
 
   /**
    * Pubsub metadata field holding id for each element, or {@literal null} if 
need to generate
    * a unique id ourselves.
    */
   @Nullable
-  private final String idLabel;
+  private final String idAttribute;
 
   /**
    * If not {@literal null}, the user is asking for PubSub attributes. This 
parse function will be
@@ -1327,8 +1327,8 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
     checkArgument((topic == null) != (subscription == null),
                   "Exactly one of topic and subscription must be given");
@@ -1340,8 +1340,8 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
     this.topic = topic;
     this.subscription = subscription;
     this.elementCoder = checkNotNull(elementCoder);
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
+    this.timestampAttribute = timestampAttribute;
+    this.idAttribute = idAttribute;
     this.parseFn = parseFn;
   }
 
@@ -1354,10 +1354,18 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<TopicPath> topic,
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
+      @Nullable String timestampAttribute,
+      @Nullable String idAttribute,
       @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
-    this(null, pubsubFactory, project, topic, subscription, elementCoder, 
timestampLabel, idLabel,
+    this(
+        null,
+        pubsubFactory,
+        project,
+        topic,
+        subscription,
+        elementCoder,
+        timestampAttribute,
+        idAttribute,
         parseFn);
   }
 
@@ -1409,19 +1417,19 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
   }
 
   /**
-   * Get the timestamp label.
+   * Get the timestamp attribute.
    */
   @Nullable
-  public String getTimestampLabel() {
-    return timestampLabel;
+  public String getTimestampAttribute() {
+    return timestampAttribute;
   }
 
   /**
-   * Get the id label.
+   * Get the id attribute.
    */
   @Nullable
-  public String getIdLabel() {
-    return idLabel;
+  public String getIdAttribute() {
+    return idAttribute;
   }
 
   /**
@@ -1438,13 +1446,14 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply("PubsubUnboundedSource.Stats",
                     ParDo.of(new StatsFn<T>(
-                        pubsubFactory, subscription, topic, timestampLabel, 
idLabel)));
+                        pubsubFactory, subscription, topic, 
timestampAttribute, idAttribute)));
   }
 
   private SubscriptionPath createRandomSubscription(PipelineOptions options) {
     try {
       try (PubsubClient pubsubClient =
-          pubsubFactory.newClient(timestampLabel, idLabel, 
options.as(PubsubOptions.class))) {
+          pubsubFactory.newClient(
+              timestampAttribute, idAttribute, 
options.as(PubsubOptions.class))) {
         checkState(project.isAccessible(), "createRandomSubscription must be 
called at runtime.");
         checkState(topic.isAccessible(), "createRandomSubscription must be 
called at runtime.");
         SubscriptionPath subscriptionPath =

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index 14c36f9..d37235f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -45,8 +45,8 @@ public class PubsubClientTest {
   //
 
   private long parse(String timestamp) {
-    Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
-    return PubsubClient.extractTimestamp("myLabel", null, map);
+    Map<String, String> map = ImmutableMap.of("myAttribute", timestamp);
+    return PubsubClient.extractTimestamp("myAttribute", null, map);
   }
 
   private void roundTripRfc339(String timestamp) {
@@ -58,106 +58,106 @@ public class PubsubClientTest {
   }
 
   @Test
-  public void noTimestampLabelReturnsPubsubPublish() {
+  public void noTimestampAttributeReturnsPubsubPublish() {
     final long time = 987654321L;
     long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), 
null);
     assertEquals(time, timestamp);
   }
 
   @Test
-  public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+  public void noTimestampAttributeAndInvalidPubsubPublishThrowsError() {
     thrown.expect(NumberFormatException.class);
     PubsubClient.extractTimestamp(null, "not-a-date", null);
   }
 
   @Test
-  public void timestampLabelWithNullAttributesThrowsError() {
+  public void timestampAttributeWithNullAttributesThrowsError() {
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp 
label myLabel");
-    PubsubClient.extractTimestamp("myLabel", null, null);
+    thrown.expectMessage("PubSub message is missing a value for timestamp 
attribute myAttribute");
+    PubsubClient.extractTimestamp("myAttribute", null, null);
   }
 
   @Test
-  public void timestampLabelSetWithMissingAttributeThrowsError() {
+  public void timestampAttributeSetWithMissingAttributeThrowsError() {
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp 
label myLabel");
+    thrown.expectMessage("PubSub message is missing a value for timestamp 
attribute myAttribute");
     Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
-    PubsubClient.extractTimestamp("myLabel", null, map);
+    PubsubClient.extractTimestamp("myAttribute", null, map);
   }
 
   @Test
-  public void timestampLabelParsesMillisecondsSinceEpoch() {
+  public void timestampAttributeParsesMillisecondsSinceEpoch() {
     long time = 1446162101123L;
-    Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
-    long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+    Map<String, String> map = ImmutableMap.of("myAttribute", 
String.valueOf(time));
+    long timestamp = PubsubClient.extractTimestamp("myAttribute", null, map);
     assertEquals(time, timestamp);
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Seconds() {
+  public void timestampAttributeParsesRfc3339Seconds() {
     roundTripRfc339("2015-10-29T23:41:41Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Tenths() {
+  public void timestampAttributeParsesRfc3339Tenths() {
     roundTripRfc339("2015-10-29T23:41:41.1Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Hundredths() {
+  public void timestampAttributeParsesRfc3339Hundredths() {
     roundTripRfc339("2015-10-29T23:41:41.12Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Millis() {
+  public void timestampAttributeParsesRfc3339Millis() {
     roundTripRfc339("2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339Micros() {
+  public void timestampAttributeParsesRfc3339Micros() {
     // Note: micros part 456/1000 is dropped.
     truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339MicrosRounding() {
+  public void timestampAttributeParsesRfc3339MicrosRounding() {
     // Note: micros part 999/1000 is dropped, not rounded up.
     truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormatThrowsError() {
+  public void timestampAttributeWithInvalidFormatThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("not-a-timestamp");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormat2ThrowsError() {
+  public void timestampAttributeWithInvalidFormat2ThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("null");
   }
 
   @Test
-  public void timestampLabelWithInvalidFormat3ThrowsError() {
+  public void timestampAttributeWithInvalidFormat3ThrowsError() {
     thrown.expect(NumberFormatException.class);
     parse("2015-10");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339WithSmallYear() {
+  public void timestampAttributeParsesRfc3339WithSmallYear() {
     // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian 
Calendar was adopted
     // This is therefore a "small year" until this difference is reconciled.
     roundTripRfc339("1582-10-15T01:23:45.123Z");
   }
 
   @Test
-  public void timestampLabelParsesRfc3339WithLargeYear() {
+  public void timestampAttributeParsesRfc3339WithLargeYear() {
     // Year 9999 in range.
     roundTripRfc339("9999-10-29T23:41:41.123999Z");
   }
 
   @Test
-  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+  public void timestampAttributeRfc3339WithTooLargeYearThrowsError() {
     thrown.expect(NumberFormatException.class);
     // Year 10000 out of range.
     parse("10000-10-29T23:41:41.123999Z");

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 63721dc..87d6029 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -72,8 +72,8 @@ public class PubsubGrpcClientTest {
   private static final long REQ_TIME = 1234L;
   private static final long PUB_TIME = 3456L;
   private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
@@ -87,7 +87,9 @@ public class PubsubGrpcClientTest {
         PubsubGrpcClientTest.class.getName(), 
ThreadLocalRandom.current().nextInt());
     inProcessChannel = 
InProcessChannelBuilder.forName(channelName).directExecutor().build();
     testCredentials = new TestCredential();
-    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, 
inProcessChannel, testCredentials);
+    client =
+        new PubsubGrpcClient(
+            TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, inProcessChannel, 
testCredentials);
   }
 
   @After
@@ -117,9 +119,9 @@ public class PubsubGrpcClientTest {
                      .setPublishTime(timestamp)
                      .putAllAttributes(ATTRIBUTES)
                      .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL,
+                         ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
                                          String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
+                             ID_ATTRIBUTE, RECORD_ID))
                      .build();
     ReceivedMessage expectedReceivedMessage =
         ReceivedMessage.newBuilder()
@@ -167,8 +169,8 @@ public class PubsubGrpcClientTest {
                      .setData(ByteString.copyFrom(DATA.getBytes()))
                      .putAllAttributes(ATTRIBUTES)
                      .putAllAttributes(
-                         ImmutableMap.of(TIMESTAMP_LABEL, 
String.valueOf(MESSAGE_TIME),
-                                         ID_LABEL, RECORD_ID))
+                         ImmutableMap.of(TIMESTAMP_ATTRIBUTE, 
String.valueOf(MESSAGE_TIME),
+                                         ID_ATTRIBUTE, RECORD_ID))
                      .build();
     final PublishRequest expectedRequest =
         PublishRequest.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 20039d4..5f06b88 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -93,14 +93,14 @@ public class PubsubIOTest {
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .fromTopic(StaticValueProvider.of(topic))
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", 
"myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test
@@ -110,14 +110,14 @@ public class PubsubIOTest {
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .fromSubscription(StaticValueProvider.of(subscription))
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("subscription", subscription));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", 
"myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test
@@ -168,14 +168,14 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     PubsubIO.Write<?> write = PubsubIO.<String>write()
         .to(topic)
-        .withTimestampLabel("myTimestamp")
-        .withIdLabel("myId");
+        .withTimestampAttribute("myTimestamp")
+        .withIdAttribute("myId");
 
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("timestampAttribute", 
"myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index d290994..578f814 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -58,8 +58,8 @@ public class PubsubJsonClientTest {
   private static final long REQ_TIME = 1234L;
   private static final long PUB_TIME = 3456L;
   private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String MESSAGE_ID = "testMessageId";
   private static final String DATA = "testData";
   private static final String RECORD_ID = "testRecordId";
@@ -68,7 +68,7 @@ public class PubsubJsonClientTest {
   @Before
   public void setup() throws IOException {
     mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
-    client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+    client = new PubsubJsonClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 
mockPubsub);
   }
 
   @After
@@ -88,8 +88,8 @@ public class PubsubJsonClientTest {
         .encodeData(DATA.getBytes())
         .setPublishTime(String.valueOf(PUB_TIME))
         .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
+            ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+                ID_ATTRIBUTE, RECORD_ID));
     ReceivedMessage expectedReceivedMessage =
         new ReceivedMessage().setMessage(expectedPubsubMessage)
                              .setAckId(ACK_ID);
@@ -117,8 +117,8 @@ public class PubsubJsonClientTest {
         .encodeData(DATA.getBytes())
         .setAttributes(
             ImmutableMap.<String, String> builder()
-                    .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
-                    .put(ID_LABEL, RECORD_ID)
+                    .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+                    .put(ID_ATTRIBUTE, RECORD_ID)
                     .put("k", "v").build());
     PublishRequest expectedRequest = new PublishRequest()
         .setMessages(ImmutableList.of(expectedPubsubMessage));

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index be425d4..580ada9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
   private static final Map<String, String> ATTRIBUTES =
           ImmutableMap.<String, String>builder().put("a", "b").put("c", 
"d").build();
   private static final long TIMESTAMP = 1234L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final int NUM_SHARDS = 10;
 
   private static class Stamp extends DoFn<String, String> {
@@ -99,7 +99,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
                                                       
ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), 
StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, 
batchBytes,
               Duration.standardSeconds(2),
               new SimpleFunction<String, PubsubIO.PubsubMessage>() {
                 @Override
@@ -135,7 +135,7 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                                                       
ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), 
StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, 
batchBytes,
               Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
@@ -170,7 +170,7 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                                                       
ImmutableList.<OutgoingMessage>of())) {
       PubsubUnboundedSink<String> sink =
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
-              StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+              StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE,
               NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
               null, RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))

http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 949ba4f..dc66ea1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -70,8 +70,8 @@ public class PubsubUnboundedSourceTest {
   private static final String DATA = "testData";
   private static final long TIMESTAMP = 1234L;
   private static final long REQ_TIME = 6373L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+  private static final String ID_ATTRIBUTE = "id";
   private static final String ACK_ID = "testAckId";
   private static final String RECORD_ID = "testRecordId";
   private static final int ACK_TIMEOUT_S = 60;
@@ -96,7 +96,7 @@ public class PubsubUnboundedSourceTest {
     PubsubUnboundedSource<String> source =
         new PubsubUnboundedSource<>(
             clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
-            StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+            StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
     primSource = new PubsubSource<>(source);
   }
 

Reply via email to