Repository: beam
Updated Branches:
refs/heads/master f5e3f5230 -> 14d60b26e
Renames {id,timestamp}Label to {id,timestamp}Attribute throughout SDK
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8853d53d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8853d53d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8853d53d
Branch: refs/heads/master
Commit: 8853d53d9ffdf6e68c80880f6dd5f2d11a6e451e
Parents: f065114
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Apr 27 17:19:14 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 18 ++---
.../org/apache/beam/sdk/util/PropertyNames.java | 4 +-
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 42 ++++++-----
.../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 36 +++++-----
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 74 ++++++++++----------
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 36 +++++-----
.../sdk/io/gcp/pubsub/PubsubTestClient.java | 6 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 58 ++++++++-------
.../io/gcp/pubsub/PubsubUnboundedSource.java | 61 +++++++++-------
.../sdk/io/gcp/pubsub/PubsubClientTest.java | 50 ++++++-------
.../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 16 +++--
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 24 +++----
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 14 ++--
.../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 10 +--
.../gcp/pubsub/PubsubUnboundedSourceTest.java | 6 +-
17 files changed, 238 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d628497..a46d3c5 100644
---
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -252,7 +252,7 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
.apply(PubsubIO.readStrings()
-
.withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index fbffac6..9af34c5 100644
---
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -191,7 +191,7 @@ public class LeaderBoard extends HourlyTeamScore {
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.readStrings()
-
.withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents.apply("CalculateTeamScores",
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 63c2191..a61fe49 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -922,12 +922,13 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
((NestedValueProvider)
overriddenTransform.getSubscriptionProvider()).propertyName());
}
}
- if (overriddenTransform.getTimestampLabel() != null) {
+ if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
- PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
+ PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
overriddenTransform.getTimestampAttribute());
}
- if (overriddenTransform.getIdLabel() != null) {
- stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL,
overriddenTransform.getIdLabel());
+ if (overriddenTransform.getIdAttribute() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_ID_ATTRIBUTE,
overriddenTransform.getIdAttribute());
}
if (overriddenTransform.getWithAttributesParseFn() != null) {
stepContext.addInput(
@@ -997,12 +998,13 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider)
overriddenTransform.getTopicProvider()).propertyName());
}
- if (overriddenTransform.getTimestampLabel() != null) {
+ if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
- PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
+ PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
overriddenTransform.getTimestampAttribute());
}
- if (overriddenTransform.getIdLabel() != null) {
- stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL,
overriddenTransform.getIdLabel());
+ if (overriddenTransform.getIdAttribute() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_ID_ATTRIBUTE,
overriddenTransform.getIdAttribute());
}
if (overriddenTransform.getFormatFn() != null) {
stepContext.addInput(
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index ee25448..aa5855b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -82,11 +82,11 @@ public class PropertyNames {
public static final String OUTPUT_NAME = "output_name";
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PHASE = "phase";
- public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+ public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN =
"pubsub_serialized_attributes_fn";
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE =
"pubsub_subscription_runtime_override";
- public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
+ public static final String PUBSUB_TIMESTAMP_ATTRIBUTE =
"pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE =
"pubsub_topic_runtime_override";
public static final String SCALAR_FIELD_NAME = "value";
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 3a69799..cfe36ee 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -42,16 +42,15 @@ abstract class PubsubClient implements Closeable {
*/
public interface PubsubClientFactory extends Serializable {
/**
- * Construct a new Pubsub client. It should be closed via {@link #close}
in order
- * to ensure tidy cleanup of underlying netty resources (or use the
try-with-resources
- * construct). Uses {@code options} to derive pubsub endpoints and
application credentials.
- * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel}
to store custom
- * timestamps/ids within message metadata.
+ * Construct a new Pubsub client. It should be closed via {@link #close}
in order to ensure tidy
+ * cleanup of underlying netty resources (or use the try-with-resources
construct). Uses {@code
+ * options} to derive pubsub endpoints and application credentials. If
non-{@literal null}, use
+ * {@code timestampAttribute} and {@code idAttribute} to store custom
timestamps/ids within
+ * message metadata.
*/
PubsubClient newClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
- PubsubOptions options) throws IOException;
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
+ throws IOException;
/**
* Return the display name for this factory. Eg "Json", "gRPC".
@@ -86,33 +85,33 @@ abstract class PubsubClient implements Closeable {
* Return the timestamp (in ms since unix epoch) to use for a Pubsub message
with {@code
* attributes} and {@code pubsubTimestamp}.
*
- * <p>If {@code timestampLabel} is non-{@literal null} then the message
attributes must contain
- * that label, and the value of that label will be taken as the timestamp.
+ * <p>If {@code timestampAttribute} is non-{@literal null} then the message
attributes must
+ * contain that attribute, and the value of that attribute will be taken as
the timestamp.
* Otherwise the timestamp will be taken from the Pubsub publish timestamp
{@code
* pubsubTimestamp}.
*
* @throws IllegalArgumentException if the timestamp cannot be recognized as
a ms-since-unix-epoch
- * or RFC3339 time.
+ * or RFC3339 time.
*/
protected static long extractTimestamp(
- @Nullable String timestampLabel,
+ @Nullable String timestampAttribute,
@Nullable String pubsubTimestamp,
@Nullable Map<String, String> attributes) {
Long timestampMsSinceEpoch;
- if (Strings.isNullOrEmpty(timestampLabel)) {
+ if (Strings.isNullOrEmpty(timestampAttribute)) {
timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
checkArgument(timestampMsSinceEpoch != null,
"Cannot interpret PubSub publish timestamp: %s",
pubsubTimestamp);
} else {
- String value = attributes == null ? null :
attributes.get(timestampLabel);
+ String value = attributes == null ? null :
attributes.get(timestampAttribute);
checkArgument(value != null,
- "PubSub message is missing a value for timestamp label %s",
- timestampLabel);
+ "PubSub message is missing a value for timestamp attribute
%s",
+ timestampAttribute);
timestampMsSinceEpoch = asMsSinceEpoch(value);
checkArgument(timestampMsSinceEpoch != null,
- "Cannot interpret value of label %s as timestamp: %s",
- timestampLabel, value);
+ "Cannot interpret value of attribute %s as timestamp: %s",
+ timestampAttribute, value);
}
return timestampMsSinceEpoch;
}
@@ -317,11 +316,10 @@ abstract class PubsubClient implements Closeable {
public final long timestampMsSinceEpoch;
/**
- * If using an id label, the record id to associate with this record's
metadata so the receiver
- * can reject duplicates. Otherwise {@literal null}.
+ * If using an id attribute, the record id to associate with this record's
metadata so the
+ * receiver can reject duplicates. Otherwise {@literal null}.
*/
- @Nullable
- public final String recordId;
+ @Nullable public final String recordId;
public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
long timestampMsSinceEpoch, @Nullable String
recordId) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 16de648..9778edf 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -79,7 +79,7 @@ class PubsubGrpcClient extends PubsubClient {
private static class PubsubGrpcClientFactory implements PubsubClientFactory {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
throws IOException {
ManagedChannel channel = NettyChannelBuilder
.forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
@@ -87,8 +87,8 @@ class PubsubGrpcClient extends PubsubClient {
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
- return new PubsubGrpcClient(timestampLabel,
- idLabel,
+ return new PubsubGrpcClient(timestampAttribute,
+ idAttribute,
DEFAULT_TIMEOUT_S,
channel,
options.getGcpCredential());
@@ -122,17 +122,17 @@ class PubsubGrpcClient extends PubsubClient {
private final Credentials credentials;
/**
- * Label to use for custom timestamps, or {@literal null} if should use
Pubsub publish time
+ * Attribute to use for custom timestamps, or {@literal null} if should use
Pubsub publish time
* instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
- * Label to use for custom ids, or {@literal null} if should use Pubsub
provided ids.
+ * Attribute to use for custom ids, or {@literal null} if should use Pubsub
provided ids.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
@@ -144,13 +144,13 @@ class PubsubGrpcClient extends PubsubClient {
@VisibleForTesting
PubsubGrpcClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
int timeoutSec,
ManagedChannel publisherChannel,
Credentials credentials) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.timeoutSec = timeoutSec;
this.publisherChannel = publisherChannel;
this.credentials = credentials;
@@ -226,13 +226,13 @@ class PubsubGrpcClient extends PubsubClient {
message.putAllAttributes(outgoingMessage.attributes);
}
- if (timestampLabel != null) {
+ if (timestampAttribute != null) {
message.getMutableAttributes()
- .put(timestampLabel,
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ .put(timestampAttribute,
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId))
{
- message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+ if (idAttribute != null &&
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ message.getMutableAttributes().put(idAttribute,
outgoingMessage.recordId);
}
request.addMessages(message);
@@ -273,7 +273,7 @@ class PubsubGrpcClient extends PubsubClient {
+ timestampProto.getNanos() /
1000L);
}
long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+ extractTimestamp(timestampAttribute, pubsubTimestampString,
attributes);
// Ack id.
String ackId = message.getAckId();
@@ -281,8 +281,8 @@ class PubsubGrpcClient extends PubsubClient {
// Record id, if any.
@Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
+ if (idAttribute != null && attributes != null) {
+ recordId = attributes.get(idAttribute);
}
if (Strings.isNullOrEmpty(recordId)) {
// Fall back to the Pubsub provided message id.
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3a7522e..129a25f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -136,12 +136,12 @@ public class PubsubIO {
* Populate common {@link DisplayData} between Pubsub source and sink.
*/
private static void populateCommonDisplayData(DisplayData.Builder builder,
- String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic)
{
+ String timestampAttribute, String idAttribute,
ValueProvider<PubsubTopic> topic) {
builder
- .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
- .withLabel("Timestamp Label Attribute"))
- .addIfNotNull(DisplayData.item("idLabel", idLabel)
- .withLabel("ID Label Attribute"));
+ .addIfNotNull(DisplayData.item("timestampAttribute",
timestampAttribute)
+ .withLabel("Timestamp Attribute"))
+ .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
+ .withLabel("ID Attribute"));
if (topic != null) {
String topicString = topic.isAccessible() ? topic.get().asPath()
@@ -529,11 +529,11 @@ public class PubsubIO {
/** The name of the message attribute to read timestamps from. */
@Nullable
- abstract String getTimestampLabel();
+ abstract String getTimestampAttribute();
/** The name of the message attribute to read unique message IDs from. */
@Nullable
- abstract String getIdLabel();
+ abstract String getIdAttribute();
/** The coder used to decode each record. */
@Nullable
@@ -551,9 +551,9 @@ public class PubsubIO {
abstract Builder<T>
setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
- abstract Builder<T> setTimestampLabel(String timestampLabel);
+ abstract Builder<T> setTimestampAttribute(String timestampAttribute);
- abstract Builder<T> setIdLabel(String idLabel);
+ abstract Builder<T> setIdAttribute(String idAttribute);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -633,7 +633,7 @@ public class PubsubIO {
* (i.e., time units smaller than milliseconds) will be ignored.
* </ul>
*
- * <p>If {@code timestampLabel} is not provided, the system will generate
record timestamps
+ * <p>If {@code timestampAttribute} is not provided, the system will
generate record timestamps
* the first time it sees each record. All windowing will be done relative
to these
* timestamps.
*
@@ -643,12 +643,12 @@ public class PubsubIO {
* specified with the windowing strategy – by default it will be
output immediately.
*
* <p>Note that the system can guarantee that no late data will ever be
seen when it assigns
- * timestamps by arrival time (i.e. {@code timestampLabel} is not
provided).
+ * timestamps by arrival time (i.e. {@code timestampAttribute} is not
provided).
*
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
- public Read<T> withTimestampLabel(String timestampLabel) {
- return toBuilder().setTimestampLabel(timestampLabel).build();
+ public Read<T> withTimestampAttribute(String timestampAttribute) {
+ return toBuilder().setTimestampAttribute(timestampAttribute).build();
}
/**
@@ -657,11 +657,11 @@ public class PubsubIO {
* The value of the attribute can be any string that uniquely identifies
this record.
*
* <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on
the Pub/Sub stream.
- * If {@code idLabel} is not provided, Beam cannot guarantee that no
duplicate data will
+ * If {@code idAttribute} is not provided, Beam cannot guarantee that no
duplicate data will
* be delivered, and deduplication of the stream will be strictly best
effort.
*/
- public Read<T> withIdLabel(String idLabel) {
- return toBuilder().setIdLabel(idLabel).build();
+ public Read<T> withIdAttribute(String idAttribute) {
+ return toBuilder().setIdAttribute(idAttribute).build();
}
/**
@@ -718,8 +718,8 @@ public class PubsubIO {
topicPath,
subscriptionPath,
getCoder(),
- getTimestampLabel(),
- getIdLabel(),
+ getTimestampAttribute(),
+ getIdAttribute(),
getParseFn());
return input.getPipeline().apply(source);
}
@@ -727,7 +727,8 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(),
getTopicProvider());
+ populateCommonDisplayData(
+ builder, getTimestampAttribute(), getIdAttribute(),
getTopicProvider());
if (getSubscriptionProvider() != null) {
String subscriptionString = getSubscriptionProvider().isAccessible()
@@ -757,11 +758,11 @@ public class PubsubIO {
/** The name of the message attribute to publish message timestamps in. */
@Nullable
- abstract String getTimestampLabel();
+ abstract String getTimestampAttribute();
/** The name of the message attribute to publish unique message IDs in. */
@Nullable
- abstract String getIdLabel();
+ abstract String getIdAttribute();
/** The input type Coder. */
@Nullable
@@ -777,9 +778,9 @@ public class PubsubIO {
abstract static class Builder<T> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic>
topicProvider);
- abstract Builder<T> setTimestampLabel(String timestampLabel);
+ abstract Builder<T> setTimestampAttribute(String timestampAttribute);
- abstract Builder<T> setIdLabel(String idLabel);
+ abstract Builder<T> setIdAttribute(String idAttribute);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -814,23 +815,23 @@ public class PubsubIO {
* time classes, {@link Instant#Instant(long)} can be used to parse this
value.
*
* <p>If the output from this sink is being read by another Beam pipeline,
then
- * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure
the other source reads
- * these timestamps from the appropriate attribute.
+ * {@link PubsubIO.Read#withTimestampAttribute(String)} can be used to
ensure the other source
+ * reads these timestamps from the appropriate attribute.
*/
- public Write<T> withTimestampLabel(String timestampLabel) {
- return toBuilder().setTimestampLabel(timestampLabel).build();
+ public Write<T> withTimestampAttribute(String timestampAttribute) {
+ return toBuilder().setTimestampAttribute(timestampAttribute).build();
}
/**
* Writes to Pub/Sub, adding each record's unique identifier to the
published messages in an
* attribute with the specified name. The value of the attribute is an
opaque string.
*
- * <p>If the the output from this sink is being read by another Beam
pipeline, then
- * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that*
the other source reads
+ * <p>If the the output from this sink is being read by another Beam
pipeline, then {@link
+ * PubsubIO.Read#withIdAttribute(String)} can be used to ensure that* the
other source reads
* these unique identifiers from the appropriate attribute.
*/
- public Write<T> withIdLabel(String idLabel) {
- return toBuilder().setIdLabel(idLabel).build();
+ public Write<T> withIdAttribute(String idAttribute) {
+ return toBuilder().setIdAttribute(idAttribute).build();
}
/**
@@ -864,8 +865,8 @@ public class PubsubIO {
FACTORY,
NestedValueProvider.of(getTopicProvider(), new
TopicPathTranslator()),
getCoder(),
- getTimestampLabel(),
- getIdLabel(),
+ getTimestampAttribute(),
+ getIdAttribute(),
getFormatFn(),
100 /* numShards */));
}
@@ -875,7 +876,8 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(),
getTopicProvider());
+ populateCommonDisplayData(
+ builder, getTimestampAttribute(), getIdAttribute(),
getTopicProvider());
}
@Override
@@ -897,9 +899,9 @@ public class PubsubIO {
@StartBundle
public void startBundle(Context c) throws IOException {
this.output = new ArrayList<>();
- // NOTE: idLabel is ignored.
+ // NOTE: idAttribute is ignored.
this.pubsubClient =
- FACTORY.newClient(getTimestampLabel(), null,
+ FACTORY.newClient(getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 39184fb..b745422 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -69,7 +69,7 @@ class PubsubJsonClient extends PubsubClient {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
throws IOException {
Pubsub pubsub = new Builder(
Transport.getTransport(),
@@ -82,7 +82,7 @@ class PubsubJsonClient extends PubsubClient {
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace())
.build();
- return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+ return new PubsubJsonClient(timestampAttribute, idAttribute, pubsub);
}
@Override
@@ -97,17 +97,17 @@ class PubsubJsonClient extends PubsubClient {
public static final PubsubClientFactory FACTORY = new
PubsubJsonClientFactory();
/**
- * Label to use for custom timestamps, or {@literal null} if should use
Pubsub publish time
+ * Attribute to use for custom timestamps, or {@literal null} if should use
Pubsub publish time
* instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
- * Label to use for custom ids, or {@literal null} if should use Pubsub
provided ids.
+ * Attribute to use for custom ids, or {@literal null} if should use Pubsub
provided ids.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* Underlying JSON transport.
@@ -116,11 +116,11 @@ class PubsubJsonClient extends PubsubClient {
@VisibleForTesting
PubsubJsonClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
Pubsub pubsub) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.pubsub = pubsub;
}
@@ -137,19 +137,19 @@ class PubsubJsonClient extends PubsubClient {
PubsubMessage pubsubMessage = new
PubsubMessage().encodeData(outgoingMessage.elementBytes);
Map<String, String> attributes = outgoingMessage.attributes;
- if ((timestampLabel != null || idLabel != null) && attributes == null) {
+ if ((timestampAttribute != null || idAttribute != null) && attributes ==
null) {
attributes = new TreeMap<>();
}
if (attributes != null) {
pubsubMessage.setAttributes(attributes);
}
- if (timestampLabel != null) {
- attributes.put(timestampLabel,
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ if (timestampAttribute != null) {
+ attributes.put(timestampAttribute,
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId))
{
- attributes.put(idLabel, outgoingMessage.recordId);
+ if (idAttribute != null &&
!Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idAttribute, outgoingMessage.recordId);
}
pubsubMessages.add(pubsubMessage);
@@ -188,7 +188,7 @@ class PubsubJsonClient extends PubsubClient {
// Timestamp.
long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel,
message.getMessage().getPublishTime(), attributes);
+ extractTimestamp(timestampAttribute,
message.getMessage().getPublishTime(), attributes);
// Ack id.
String ackId = message.getAckId();
@@ -196,8 +196,8 @@ class PubsubJsonClient extends PubsubClient {
// Record id, if any.
@Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
+ if (idAttribute != null && attributes != null) {
+ recordId = attributes.get(idAttribute);
}
if (Strings.isNullOrEmpty(recordId)) {
// Fall back to the Pubsub provided message id.
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index 9d40e41..df90597 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -136,7 +136,7 @@ class PubsubTestClient extends PubsubClient implements
Serializable {
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@@ -182,7 +182,7 @@ class PubsubTestClient extends PubsubClient implements
Serializable {
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@@ -226,7 +226,7 @@ class PubsubTestClient extends PubsubClient implements
Serializable {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel,
PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute,
PubsubOptions options)
throws IOException {
return new PubsubTestClient() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 002e979..8d273ba 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -224,8 +224,8 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
private final PubsubClientFactory pubsubFactory;
private final ValueProvider<TopicPath> topic;
- private final String timestampLabel;
- private final String idLabel;
+ private final String timestampAttribute;
+ private final String idAttribute;
private final int publishBatchSize;
private final int publishBatchBytes;
@@ -240,12 +240,16 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
private final Counter byteCounter = SinkMetrics.bytesWritten();
WriterFn(
- PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
- String timestampLabel, String idLabel, int publishBatchSize, int
publishBatchBytes) {
+ PubsubClientFactory pubsubFactory,
+ ValueProvider<TopicPath> topic,
+ String timestampAttribute,
+ String idAttribute,
+ int publishBatchSize,
+ int publishBatchBytes) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
}
@@ -267,7 +271,7 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
@StartBundle
public void startBundle(Context c) throws Exception {
checkState(pubsubClient == null, "startBundle invoked without prior
finishBundle");
- pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+ pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute,
c.getPipelineOptions().as(PubsubOptions.class));
}
@@ -311,8 +315,8 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
: topic.toString();
builder.add(DisplayData.item("topic", topicString));
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
- builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
- builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ builder.addIfNotNull(DisplayData.item("timestampAttribute",
timestampAttribute));
+ builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
@@ -341,14 +345,14 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
* Pubsub message publish timestamp instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if
need to generate
* a unique id ourselves.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* Number of 'shards' to use so that latency in Pubsub publish can be
hidden. Generally this
@@ -374,7 +378,7 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
private final Duration maxLatency;
/**
- * How record ids should be generated for each record (if {@link #idLabel}
is non-{@literal
+ * How record ids should be generated for each record (if {@link
#idAttribute} is non-{@literal
* null}).
*/
private final RecordIdMethod recordIdMethod;
@@ -390,8 +394,8 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
Coder<T> elementCoder,
- String timestampLabel,
- String idLabel,
+ String timestampAttribute,
+ String idAttribute,
int numShards,
int publishBatchSize,
int publishBatchBytes,
@@ -401,25 +405,25 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.elementCoder = elementCoder;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.numShards = numShards;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.maxLatency = maxLatency;
this.formatFn = formatFn;
- this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE :
recordIdMethod;
+ this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE :
recordIdMethod;
}
public PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
Coder<T> elementCoder,
- String timestampLabel,
- String idLabel,
+ String timestampAttribute,
+ String idAttribute,
SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
int numShards) {
- this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel,
numShards,
+ this(pubsubFactory, topic, elementCoder, timestampAttribute, idAttribute,
numShards,
DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES,
DEFAULT_MAX_LATENCY,
formatFn, RecordIdMethod.RANDOM);
}
@@ -439,19 +443,19 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
}
/**
- * Get the timestamp label.
+ * Get the timestamp attribute.
*/
@Nullable
- public String getTimestampLabel() {
- return timestampLabel;
+ public String getTimestampAttribute() {
+ return timestampAttribute;
}
/**
- * Get the id label.
+ * Get the id attribute.
*/
@Nullable
- public String getIdLabel() {
- return idLabel;
+ public String getIdAttribute() {
+ return idAttribute;
}
/**
@@ -483,7 +487,7 @@ public class PubsubUnboundedSink<T> extends
PTransform<PCollection<T>, PDone> {
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
.apply("PubsubUnboundedSink.Writer",
- ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel,
idLabel,
+ ParDo.of(new WriterFn(pubsubFactory, topic, timestampAttribute,
idAttribute,
publishBatchSize, publishBatchBytes)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 6392fd2..903ae41 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -602,7 +602,7 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
pubsubClient =
new AtomicReference<>(
outer.outer.pubsubFactory.newClient(
- outer.outer.timestampLabel, outer.outer.idLabel, options));
+ outer.outer.timestampAttribute, outer.outer.idAttribute,
options));
ackTimeoutMs = -1;
safeToAckIds = new HashSet<>();
notYetRead = new ArrayDeque<>();
@@ -1207,22 +1207,22 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
@Nullable
private final ValueProvider<TopicPath> topic;
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
@Nullable
- private final String idLabel;
+ private final String idAttribute;
public StatsFn(
PubsubClientFactory pubsubFactory,
@Nullable ValueProvider<SubscriptionPath> subscription,
@Nullable ValueProvider<TopicPath> topic,
- @Nullable String timestampLabel,
- @Nullable String idLabel) {
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute) {
checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
this.pubsubFactory = pubsubFactory;
this.subscription = subscription;
this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
}
@ProcessElement
@@ -1247,8 +1247,8 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
builder.add(DisplayData.item("topic", topicString));
}
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
- builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
- builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ builder.addIfNotNull(DisplayData.item("timestampAttribute",
timestampAttribute));
+ builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
@@ -1303,14 +1303,14 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
* Pubsub message publish timestamp instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if
need to generate
* a unique id ourselves.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* If not {@literal null}, the user is asking for PubSub attributes. This
parse function will be
@@ -1327,8 +1327,8 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
@Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
checkArgument((topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
@@ -1340,8 +1340,8 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
this.topic = topic;
this.subscription = subscription;
this.elementCoder = checkNotNull(elementCoder);
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.parseFn = parseFn;
}
@@ -1354,10 +1354,18 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
@Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
- this(null, pubsubFactory, project, topic, subscription, elementCoder,
timestampLabel, idLabel,
+ this(
+ null,
+ pubsubFactory,
+ project,
+ topic,
+ subscription,
+ elementCoder,
+ timestampAttribute,
+ idAttribute,
parseFn);
}
@@ -1409,19 +1417,19 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
}
/**
- * Get the timestamp label.
+ * Get the timestamp attribute.
*/
@Nullable
- public String getTimestampLabel() {
- return timestampLabel;
+ public String getTimestampAttribute() {
+ return timestampAttribute;
}
/**
- * Get the id label.
+ * Get the id attribute.
*/
@Nullable
- public String getIdLabel() {
- return idLabel;
+ public String getIdAttribute() {
+ return idAttribute;
}
/**
@@ -1438,13 +1446,14 @@ public class PubsubUnboundedSource<T> extends
PTransform<PBegin, PCollection<T>>
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
ParDo.of(new StatsFn<T>(
- pubsubFactory, subscription, topic, timestampLabel,
idLabel)));
+ pubsubFactory, subscription, topic,
timestampAttribute, idAttribute)));
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
try {
try (PubsubClient pubsubClient =
- pubsubFactory.newClient(timestampLabel, idLabel,
options.as(PubsubOptions.class))) {
+ pubsubFactory.newClient(
+ timestampAttribute, idAttribute,
options.as(PubsubOptions.class))) {
checkState(project.isAccessible(), "createRandomSubscription must be
called at runtime.");
checkState(topic.isAccessible(), "createRandomSubscription must be
called at runtime.");
SubscriptionPath subscriptionPath =
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index 14c36f9..d37235f 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -45,8 +45,8 @@ public class PubsubClientTest {
//
private long parse(String timestamp) {
- Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
- return PubsubClient.extractTimestamp("myLabel", null, map);
+ Map<String, String> map = ImmutableMap.of("myAttribute", timestamp);
+ return PubsubClient.extractTimestamp("myAttribute", null, map);
}
private void roundTripRfc339(String timestamp) {
@@ -58,106 +58,106 @@ public class PubsubClientTest {
}
@Test
- public void noTimestampLabelReturnsPubsubPublish() {
+ public void noTimestampAttributeReturnsPubsubPublish() {
final long time = 987654321L;
long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time),
null);
assertEquals(time, timestamp);
}
@Test
- public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+ public void noTimestampAttributeAndInvalidPubsubPublishThrowsError() {
thrown.expect(NumberFormatException.class);
PubsubClient.extractTimestamp(null, "not-a-date", null);
}
@Test
- public void timestampLabelWithNullAttributesThrowsError() {
+ public void timestampAttributeWithNullAttributesThrowsError() {
thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp
label myLabel");
- PubsubClient.extractTimestamp("myLabel", null, null);
+ thrown.expectMessage("PubSub message is missing a value for timestamp
attribute myAttribute");
+ PubsubClient.extractTimestamp("myAttribute", null, null);
}
@Test
- public void timestampLabelSetWithMissingAttributeThrowsError() {
+ public void timestampAttributeSetWithMissingAttributeThrowsError() {
thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp
label myLabel");
+ thrown.expectMessage("PubSub message is missing a value for timestamp
attribute myAttribute");
Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
- PubsubClient.extractTimestamp("myLabel", null, map);
+ PubsubClient.extractTimestamp("myAttribute", null, map);
}
@Test
- public void timestampLabelParsesMillisecondsSinceEpoch() {
+ public void timestampAttributeParsesMillisecondsSinceEpoch() {
long time = 1446162101123L;
- Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
- long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+ Map<String, String> map = ImmutableMap.of("myAttribute",
String.valueOf(time));
+ long timestamp = PubsubClient.extractTimestamp("myAttribute", null, map);
assertEquals(time, timestamp);
}
@Test
- public void timestampLabelParsesRfc3339Seconds() {
+ public void timestampAttributeParsesRfc3339Seconds() {
roundTripRfc339("2015-10-29T23:41:41Z");
}
@Test
- public void timestampLabelParsesRfc3339Tenths() {
+ public void timestampAttributeParsesRfc3339Tenths() {
roundTripRfc339("2015-10-29T23:41:41.1Z");
}
@Test
- public void timestampLabelParsesRfc3339Hundredths() {
+ public void timestampAttributeParsesRfc3339Hundredths() {
roundTripRfc339("2015-10-29T23:41:41.12Z");
}
@Test
- public void timestampLabelParsesRfc3339Millis() {
+ public void timestampAttributeParsesRfc3339Millis() {
roundTripRfc339("2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelParsesRfc3339Micros() {
+ public void timestampAttributeParsesRfc3339Micros() {
// Note: micros part 456/1000 is dropped.
truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelParsesRfc3339MicrosRounding() {
+ public void timestampAttributeParsesRfc3339MicrosRounding() {
// Note: micros part 999/1000 is dropped, not rounded up.
truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelWithInvalidFormatThrowsError() {
+ public void timestampAttributeWithInvalidFormatThrowsError() {
thrown.expect(NumberFormatException.class);
parse("not-a-timestamp");
}
@Test
- public void timestampLabelWithInvalidFormat2ThrowsError() {
+ public void timestampAttributeWithInvalidFormat2ThrowsError() {
thrown.expect(NumberFormatException.class);
parse("null");
}
@Test
- public void timestampLabelWithInvalidFormat3ThrowsError() {
+ public void timestampAttributeWithInvalidFormat3ThrowsError() {
thrown.expect(NumberFormatException.class);
parse("2015-10");
}
@Test
- public void timestampLabelParsesRfc3339WithSmallYear() {
+ public void timestampAttributeParsesRfc3339WithSmallYear() {
// Google and JodaTime agree on dates after 1582-10-15, when the Gregorian
Calendar was adopted
// This is therefore a "small year" until this difference is reconciled.
roundTripRfc339("1582-10-15T01:23:45.123Z");
}
@Test
- public void timestampLabelParsesRfc3339WithLargeYear() {
+ public void timestampAttributeParsesRfc3339WithLargeYear() {
// Year 9999 in range.
roundTripRfc339("9999-10-29T23:41:41.123999Z");
}
@Test
- public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+ public void timestampAttributeRfc3339WithTooLargeYearThrowsError() {
thrown.expect(NumberFormatException.class);
// Year 10000 out of range.
parse("10000-10-29T23:41:41.123999Z");
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 63721dc..87d6029 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -72,8 +72,8 @@ public class PubsubGrpcClientTest {
private static final long REQ_TIME = 1234L;
private static final long PUB_TIME = 3456L;
private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
@@ -87,7 +87,9 @@ public class PubsubGrpcClientTest {
PubsubGrpcClientTest.class.getName(),
ThreadLocalRandom.current().nextInt());
inProcessChannel =
InProcessChannelBuilder.forName(channelName).directExecutor().build();
testCredentials = new TestCredential();
- client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10,
inProcessChannel, testCredentials);
+ client =
+ new PubsubGrpcClient(
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, inProcessChannel,
testCredentials);
}
@After
@@ -117,9 +119,9 @@ public class PubsubGrpcClientTest {
.setPublishTime(timestamp)
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL,
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
+ ID_ATTRIBUTE, RECORD_ID))
.build();
ReceivedMessage expectedReceivedMessage =
ReceivedMessage.newBuilder()
@@ -167,8 +169,8 @@ public class PubsubGrpcClientTest {
.setData(ByteString.copyFrom(DATA.getBytes()))
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL,
String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
String.valueOf(MESSAGE_TIME),
+ ID_ATTRIBUTE, RECORD_ID))
.build();
final PublishRequest expectedRequest =
PublishRequest.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 20039d4..5f06b88 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -93,14 +93,14 @@ public class PubsubIOTest {
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
.fromTopic(StaticValueProvider.of(topic))
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute",
"myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
@@ -110,14 +110,14 @@ public class PubsubIOTest {
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
.fromSubscription(StaticValueProvider.of(subscription))
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("subscription", subscription));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute",
"myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
@@ -168,14 +168,14 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
PubsubIO.Write<?> write = PubsubIO.<String>write()
.to(topic)
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute",
"myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index d290994..578f814 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -58,8 +58,8 @@ public class PubsubJsonClientTest {
private static final long REQ_TIME = 1234L;
private static final long PUB_TIME = 3456L;
private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
@@ -68,7 +68,7 @@ public class PubsubJsonClientTest {
@Before
public void setup() throws IOException {
mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
- client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+ client = new PubsubJsonClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE,
mockPubsub);
}
@After
@@ -88,8 +88,8 @@ public class PubsubJsonClientTest {
.encodeData(DATA.getBytes())
.setPublishTime(String.valueOf(PUB_TIME))
.setAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID));
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+ ID_ATTRIBUTE, RECORD_ID));
ReceivedMessage expectedReceivedMessage =
new ReceivedMessage().setMessage(expectedPubsubMessage)
.setAckId(ACK_ID);
@@ -117,8 +117,8 @@ public class PubsubJsonClientTest {
.encodeData(DATA.getBytes())
.setAttributes(
ImmutableMap.<String, String> builder()
- .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
- .put(ID_LABEL, RECORD_ID)
+ .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+ .put(ID_ATTRIBUTE, RECORD_ID)
.put("k", "v").build());
PublishRequest expectedRequest = new PublishRequest()
.setMessages(ImmutableList.of(expectedPubsubMessage));
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index be425d4..580ada9 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
private static final Map<String, String> ATTRIBUTES =
ImmutableMap.<String, String>builder().put("a", "b").put("c",
"d").build();
private static final long TIMESTAMP = 1234L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final int NUM_SHARDS = 10;
private static class Stamp extends DoFn<String, String> {
@@ -99,7 +99,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize,
batchBytes,
Duration.standardSeconds(2),
new SimpleFunction<String, PubsubIO.PubsubMessage>() {
@Override
@@ -135,7 +135,7 @@ public class PubsubUnboundedSinkTest implements
Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize,
batchBytes,
Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
@@ -170,7 +170,7 @@ public class PubsubUnboundedSinkTest implements
Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE,
NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 949ba4f..dc66ea1 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -70,8 +70,8 @@ public class PubsubUnboundedSourceTest {
private static final String DATA = "testData";
private static final long TIMESTAMP = 1234L;
private static final long REQ_TIME = 6373L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String ACK_ID = "testAckId";
private static final String RECORD_ID = "testRecordId";
private static final int ACK_TIMEOUT_S = 60;
@@ -96,7 +96,7 @@ public class PubsubUnboundedSourceTest {
PubsubUnboundedSource<String> source =
new PubsubUnboundedSource<>(
clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+ StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
primSource = new PubsubSource<>(source);
}