scwhittle commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1168369739
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java:
##########
@@ -2341,6 +2347,97 @@ public void
testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
verifyGroupIntoBatchesOverrideBytes(p, true, true);
}
+ @Test
+ public void testPubsubSinkOverride() throws IOException {
+ PipelineOptions options = buildPipelineOptions();
+ List<String> experiments =
+ new ArrayList<>(
+ ImmutableList.of(
+ GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+ GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+ "use_runner_v2"));
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ dataflowOptions.setExperiments(experiments);
+ dataflowOptions.setStreaming(true);
+ Pipeline p = Pipeline.create(options);
+
+ List<PubsubMessage> testValues =
+ Arrays.asList(
+ new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8),
Collections.emptyMap()));
+ PCollection<PubsubMessage> input =
+ p.apply("CreateValuesBytes", Create.of(testValues))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+ p.run();
+
+ AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+ p.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
Review Comment:
it should only be a composite or a primitive right?
can you simplify this test and remove the unnecessary override here?
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java:
##########
@@ -46,11 +46,15 @@ public class PropertyNames {
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN =
"pubsub_serialized_attributes_fn";
+
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE =
"pubsub_subscription_runtime_override";
public static final String PUBSUB_TIMESTAMP_ATTRIBUTE =
"pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE =
"pubsub_topic_runtime_override";
+
+ public static final String PUBSUB_DYNAMIC_DESTINATIONS =
"pubsub_with_dynamic_destinations";
Review Comment:
nit: pubsub_dynamic_destinations to match enum? ie remove "with"
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+ private final String timestampLabel;
+ private final String idLabel;
+ private final StreamingModeExecutionContext context;
+ // Function used to convert PCollection elements to PubsubMessage objects.
+
+ PubsubDynamicSink(String timestampLabel, String idLabel,
StreamingModeExecutionContext context) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.context = context;
+ }
+
+ /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+ @AutoService(SinkFactory.Registrar.class)
+ public static class Registrar implements SinkFactory.Registrar {
+
+ @Override
+ public Map<String, SinkFactory> factories() {
+ PubsubDynamicSink.Factory factory = new Factory();
+ return ImmutableMap.of(
+ "PubsubDynamicSink",
+ factory,
+ "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+ factory);
+ }
+ }
+
+ public static class Factory implements SinkFactory {
+ @Override
+ public PubsubDynamicSink create(
+ CloudObject spec,
+ Coder<?> coder,
+ @Nullable PipelineOptions options,
+ @Nullable DataflowExecutionContext executionContext,
+ DataflowOperationContext operationContext)
+ throws Exception {
+ String timestampLabel = getString(spec,
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+ String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+ return new PubsubDynamicSink(
+ timestampLabel, idLabel, (StreamingModeExecutionContext)
executionContext);
+ }
+ }
+
+ @Override
+ public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+ return new PubsubDynamicSink.PubsubWriter();
+ }
+
+ class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+ private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
Review Comment:
final for these?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+ private final String timestampLabel;
+ private final String idLabel;
+ private final StreamingModeExecutionContext context;
+ // Function used to convert PCollection elements to PubsubMessage objects.
+
+ PubsubDynamicSink(String timestampLabel, String idLabel,
StreamingModeExecutionContext context) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.context = context;
+ }
+
+ /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+ @AutoService(SinkFactory.Registrar.class)
+ public static class Registrar implements SinkFactory.Registrar {
+
+ @Override
+ public Map<String, SinkFactory> factories() {
+ PubsubDynamicSink.Factory factory = new Factory();
+ return ImmutableMap.of(
+ "PubsubDynamicSink",
+ factory,
+ "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+ factory);
+ }
+ }
+
+ public static class Factory implements SinkFactory {
Review Comment:
can this be private or package visible? or perhaps even inlined in
factories() above?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+ private final String timestampLabel;
+ private final String idLabel;
+ private final StreamingModeExecutionContext context;
+ // Function used to convert PCollection elements to PubsubMessage objects.
+
+ PubsubDynamicSink(String timestampLabel, String idLabel,
StreamingModeExecutionContext context) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.context = context;
+ }
+
+ /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+ @AutoService(SinkFactory.Registrar.class)
+ public static class Registrar implements SinkFactory.Registrar {
+
+ @Override
+ public Map<String, SinkFactory> factories() {
+ PubsubDynamicSink.Factory factory = new Factory();
+ return ImmutableMap.of(
+ "PubsubDynamicSink",
+ factory,
+ "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+ factory);
+ }
+ }
+
+ public static class Factory implements SinkFactory {
+ @Override
+ public PubsubDynamicSink create(
+ CloudObject spec,
+ Coder<?> coder,
+ @Nullable PipelineOptions options,
+ @Nullable DataflowExecutionContext executionContext,
+ DataflowOperationContext operationContext)
+ throws Exception {
+ String timestampLabel = getString(spec,
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+ String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+ return new PubsubDynamicSink(
+ timestampLabel, idLabel, (StreamingModeExecutionContext)
executionContext);
+ }
+ }
+
+ @Override
+ public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+ return new PubsubDynamicSink.PubsubWriter();
+ }
+
+ class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+ private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
+ private ByteStringOutputStream stream; // Kept across adds for buffer
reuse.
+
+ PubsubWriter() {
+ outputBuilders = Maps.newHashMap();
+ stream = new ByteStringOutputStream();
+ }
+
+ public ByteString getDataFromMessage(PubsubMessage formatted,
ByteStringOutputStream stream)
+ throws IOException {
+ Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
+ if (formatted.getAttributeMap() != null) {
+ pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+ }
+ pubsubMessageBuilder.build().writeTo(stream);
+ return stream.toByteStringAndReset();
+ }
+
+ public void close(Windmill.PubSubMessageBundle.Builder outputBuilder)
throws IOException {
+ Windmill.PubSubMessageBundle pubsubMessages = outputBuilder.build();
+ if (pubsubMessages.getMessagesCount() > 0) {
+ context.getOutputBuilder().addPubsubMessages(pubsubMessages);
Review Comment:
can you avoid the build above and instead add the builder here?
or is there a reason to not just directly add to the
context.getOutputBuilder() in add? You can still have the map to identify the
bundle but just add directly to it.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##########
@@ -114,6 +115,7 @@ public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
SdkComponents components) {
PubSubWritePayload.Builder payloadBuilder =
PubSubWritePayload.newBuilder();
+ @Nullable
Review Comment:
seems you need to nullcheck below if nullable
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
Review Comment:
also show string and proto examples and not just avro?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to
extract the topic from
Review Comment:
Provide link and mirror other text?
In order to write to dynamic topic destinations, the {@link ...} methods can
be used, specifying a function to extract the topic from the record.
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java:
##########
@@ -2341,6 +2347,97 @@ public void
testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
verifyGroupIntoBatchesOverrideBytes(p, true, true);
}
+ @Test
+ public void testPubsubSinkOverride() throws IOException {
+ PipelineOptions options = buildPipelineOptions();
+ List<String> experiments =
+ new ArrayList<>(
+ ImmutableList.of(
+ GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+ GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+ "use_runner_v2"));
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ dataflowOptions.setExperiments(experiments);
+ dataflowOptions.setStreaming(true);
+ Pipeline p = Pipeline.create(options);
+
+ List<PubsubMessage> testValues =
+ Arrays.asList(
+ new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8),
Collections.emptyMap()));
+ PCollection<PubsubMessage> input =
+ p.apply("CreateValuesBytes", Create.of(testValues))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+ p.run();
+
+ AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+ p.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (node.getTransform() instanceof
DataflowRunner.StreamingPubsubIOWrite) {
+ sawPubsubOverride.set(true);
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(@UnknownKeyFor @NonNull
@Initialized Node node) {
+ if (node.getTransform() instanceof
DataflowRunner.StreamingPubsubIOWrite) {
+ sawPubsubOverride.set(true);
+ }
+ }
+ });
+ assertTrue(sawPubsubOverride.get());
+ }
+
+ @Test
+ public void testPubsubSinkDynamicOverride() throws IOException {
+ PipelineOptions options = buildPipelineOptions();
+ List<String> experiments =
+ new ArrayList<>(
+ ImmutableList.of(
+ GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+ GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+ "use_runner_v2"));
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ dataflowOptions.setExperiments(experiments);
+ dataflowOptions.setStreaming(true);
+ Pipeline p = Pipeline.create(options);
+
+ List<PubsubMessage> testValues =
+ Arrays.asList(
+ new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8),
Collections.emptyMap())
+ .withTopic(""));
+ PCollection<PubsubMessage> input =
+ p.apply("CreateValuesBytes", Create.of(testValues))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ input.apply(PubsubIO.writeMessagesDynamic());
+ p.run();
+
+ AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+ p.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
Review Comment:
ditto
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
Review Comment:
since this is new, can you avoid supressing these?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String
projectId, String topicName) {
public abstract static class OutgoingMessage implements Serializable {
/** Underlying Message. May not have publish timestamp set. */
- public abstract PubsubMessage message();
+ public abstract PubsubMessage getMessage();
/** Timestamp for element (ms since epoch). */
- public abstract long timestampMsSinceEpoch();
+ public abstract long getTimestampMsSinceEpoch();
/**
* If using an id attribute, the record id to associate with this record's
metadata so the
* receiver can reject duplicates. Otherwise {@literal null}.
*/
public abstract @Nullable String recordId();
+ public abstract @Nullable String topic();
+
public static OutgoingMessage of(
- PubsubMessage message, long timestampMsSinceEpoch, @Nullable String
recordId) {
- return new AutoValue_PubsubClient_OutgoingMessage(message,
timestampMsSinceEpoch, recordId);
+ PubsubMessage message,
+ long timestampMsSinceEpoch,
+ @Nullable String recordId,
+ String topic) {
Review Comment:
should topic be nullable here? or the accessor not nullable?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String
projectId, String topicName) {
public abstract static class OutgoingMessage implements Serializable {
/** Underlying Message. May not have publish timestamp set. */
- public abstract PubsubMessage message();
+ public abstract PubsubMessage getMessage();
/** Timestamp for element (ms since epoch). */
- public abstract long timestampMsSinceEpoch();
+ public abstract long getTimestampMsSinceEpoch();
/**
* If using an id attribute, the record id to associate with this record's
metadata so the
* receiver can reject duplicates. Otherwise {@literal null}.
*/
public abstract @Nullable String recordId();
+ public abstract @Nullable String topic();
+
public static OutgoingMessage of(
- PubsubMessage message, long timestampMsSinceEpoch, @Nullable String
recordId) {
- return new AutoValue_PubsubClient_OutgoingMessage(message,
timestampMsSinceEpoch, recordId);
+ PubsubMessage message,
+ long timestampMsSinceEpoch,
+ @Nullable String recordId,
+ String topic) {
+ return new AutoValue_PubsubClient_OutgoingMessage(
+ message, timestampMsSinceEpoch, recordId, topic);
}
public static OutgoingMessage of(
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
long timestampMsSinceEpoch,
- @Nullable String recordId) {
+ @Nullable String recordId,
+ String topic) {
Review Comment:
ditto
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+ private final String timestampLabel;
+ private final String idLabel;
+ private final StreamingModeExecutionContext context;
+ // Function used to convert PCollection elements to PubsubMessage objects.
+
+ PubsubDynamicSink(String timestampLabel, String idLabel,
StreamingModeExecutionContext context) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.context = context;
+ }
+
+ /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+ @AutoService(SinkFactory.Registrar.class)
+ public static class Registrar implements SinkFactory.Registrar {
+
+ @Override
+ public Map<String, SinkFactory> factories() {
+ PubsubDynamicSink.Factory factory = new Factory();
+ return ImmutableMap.of(
+ "PubsubDynamicSink",
+ factory,
+ "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+ factory);
+ }
+ }
+
+ public static class Factory implements SinkFactory {
+ @Override
+ public PubsubDynamicSink create(
+ CloudObject spec,
+ Coder<?> coder,
+ @Nullable PipelineOptions options,
+ @Nullable DataflowExecutionContext executionContext,
+ DataflowOperationContext operationContext)
+ throws Exception {
+ String timestampLabel = getString(spec,
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+ String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+ return new PubsubDynamicSink(
+ timestampLabel, idLabel, (StreamingModeExecutionContext)
executionContext);
+ }
+ }
+
+ @Override
+ public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+ return new PubsubDynamicSink.PubsubWriter();
+ }
+
+ class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+ private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
+ private ByteStringOutputStream stream; // Kept across adds for buffer
reuse.
+
+ PubsubWriter() {
+ outputBuilders = Maps.newHashMap();
+ stream = new ByteStringOutputStream();
+ }
+
+ public ByteString getDataFromMessage(PubsubMessage formatted,
ByteStringOutputStream stream)
+ throws IOException {
+ Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
Review Comment:
any way to avoid this copy?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import avro.shaded.com.google.common.collect.Lists;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link PubsubSink}. */
+@RunWith(JUnit4.class)
+public class PubsubDynamicSinkTest {
+ @Mock StreamingModeExecutionContext mockContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testWriteDynamicDestinations() throws Exception {
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("key"))
+ .setWorkToken(0);
+
+ when(mockContext.getOutputBuilder()).thenReturn(outputBuilder);
+
+ Map<String, Object> spec = new HashMap<>();
+ spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+ spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+ spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+
+ CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+ PubsubDynamicSink.Factory factory = new PubsubDynamicSink.Factory();
+ PubsubDynamicSink sink =
+ factory.create(
+ cloudSinkSpec,
+ WindowedValue.getFullCoder(VoidCoder.of(),
IntervalWindow.getCoder()),
+ null,
+ mockContext,
+ null);
+
+ Sink.SinkWriter<WindowedValue<PubsubMessage>> writer = sink.writer();
+
+ List<Windmill.Message> expectedMessages = Lists.newArrayList();
+ for (int i = 0; i < 10; ++i) {
+ byte[] payload = String.format("value_%d",
i).getBytes(StandardCharsets.UTF_8);
+ Pubsub.PubsubMessage pubsubMessage =
+
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(payload)).build();
+ expectedMessages.add(
+ Windmill.Message.newBuilder()
+ .setTimestamp(i * 1000)
+ .setData(pubsubMessage.toByteString())
+ .build());
+ writer.add(
+ WindowedValue.timestampedValueInGlobalWindow(
+ new PubsubMessage(payload, null).withTopic("topic1"), new
Instant(i)));
Review Comment:
vary the payload and timestamp per topic to better verify
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to
extract the topic from
+ * the record. For example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ * to((ValueInSingleWindow<Event> quote) -> {
+ * String country = quote.getCountry();
+ * return "projects/myproject/topics/events_" + country;
+ * });
+ * }</pre>
+ *
+ * Dynamic topics can also be specified by writing {@link PubsubMessage}
objects containing the
+ * topic. For example:
+ *
+ * <pre>{@code
+ * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {})
+ * .via(e -> new PubsubMessage(
+ * e.toByteString(),
Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }</pre>
+ *
+ * <h3>Custom timestamps</h3>
+ *
+ * All messages read from PubSub have a stable publish timestamp that is
independent of when the
+ * message is read from the PubSub topic. By default, the publish time is used
as the timestamp for
+ * all messages read and the watermark is based on that. If there is a
different logical timestamp
+ * to be used, that timestamp must be published in a PubSub attribute. The
attribute is specified
Review Comment:
that timestamp may be published in a Pubsub attribute specified using {@link
....}
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws
IOException {
@ProcessElement
public void processElement(ProcessContext c) throws IOException,
SizeLimitExceededException {
PubsubMessage message = getFormatFn().apply(c.element());
- int messageSize = validateAndGetPubsubMessageSize(message);
- if (messageSize > maxPublishBatchByteSize) {
Review Comment:
keep this as sanity check?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -242,7 +271,19 @@ private static class WriterFn extends DoFn<KV<Integer,
Iterable<OutgoingMessage>
/** BLOCKING Send {@code messages} as a batch to Pubsub. */
private void publishBatch(List<OutgoingMessage> messages, int bytes)
throws IOException {
- int n = pubsubClient.publish(topic.get(), messages);
+ int n = 0;
+ if (topic != null) {
+ n = pubsubClient.publish(topic.get(), messages);
+ } else {
+ Map<TopicPath, List<OutgoingMessage>> messagesPerTopic =
Maps.newHashMap();
+ for (OutgoingMessage message : messages) {
+ TopicPath topicPath =
PubsubClient.topicPathFromPath(message.topic());
Review Comment:
could do this translation during publish loop so it's per topic instead of
per message
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -495,6 +604,10 @@ static class PubsubSink extends
PTransform<PCollection<byte[]>, PDone> {
this.outer = outer;
}
+ boolean isDynamic() {
Review Comment:
remove? seems PubsubUnboundedSink would be used if this was true?
##########
website/www/site/layouts/shortcodes/flink_python_pipeline_options.html:
##########
@@ -133,6 +132,11 @@
<td>Sets the behavior of reusing objects.</td>
<td>Default: <code>false</code></td>
</tr>
+<tr>
+ <td><code>operator_chaining</code></td>
Review Comment:
ditto
##########
website/www/site/layouts/shortcodes/flink_java_pipeline_options.html:
##########
@@ -133,6 +132,11 @@
<td>Sets the behavior of reusing objects.</td>
<td>Default: <code>false</code></td>
</tr>
+<tr>
+ <td><code>operatorChaining</code></td>
Review Comment:
revert this file?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import avro.shaded.com.google.common.collect.Lists;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link PubsubSink}. */
+@RunWith(JUnit4.class)
+public class PubsubDynamicSinkTest {
+ @Mock StreamingModeExecutionContext mockContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testWriteDynamicDestinations() throws Exception {
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("key"))
+ .setWorkToken(0);
+
+ when(mockContext.getOutputBuilder()).thenReturn(outputBuilder);
+
+ Map<String, Object> spec = new HashMap<>();
+ spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+ spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+ spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+
+ CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+ PubsubDynamicSink.Factory factory = new PubsubDynamicSink.Factory();
Review Comment:
would it be better to look up in the registry if that is normally how things
are wired up? (would also mean you don't have to expose Factory)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]