Provides a default coder for PubsubMessage
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e57b5013 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e57b5013 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e57b5013 Branch: refs/heads/master Commit: e57b5013bc2d36602d24b8aba98ba1d28ec04933 Parents: 5b0a868 Author: Eugene Kirpichov <[email protected]> Authored: Wed May 3 18:16:47 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed May 3 19:18:46 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 36 ++++++++++++++++++++ .../PubsubMessageWithAttributesCoder.java | 5 +++ .../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 3 -- 3 files changed, 41 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java new file mode 100644 index 0000000..5944305 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.coders.CoderFactories; +import org.apache.beam.sdk.coders.CoderFactory; +import org.apache.beam.sdk.coders.CoderRegistrar; + +/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */ +@AutoService(CoderRegistrar.class) +public class PubsubCoderRegistrar implements CoderRegistrar { + @Override + public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() { + return ImmutableMap.<Class<?>, CoderFactory>of( + PubsubIO.PubsubMessage.class, + CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index 27f0f02..be9493c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; /** A coder for PubsubMessage including attributes. */ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage> { @@ -35,6 +36,10 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.Pubsu private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of( StringUtf8Coder.of(), StringUtf8Coder.of()); + public static Coder<PubsubIO.PubsubMessage> of(TypeDescriptor<PubsubIO.PubsubMessage> ignored) { + return of(); + } + public static PubsubMessageWithAttributesCoder of() { return new PubsubMessageWithAttributesCoder(); } http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 11e7d83..f2f40bb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -115,7 +115,6 @@ public class PubsubUnboundedSinkTest implements Serializable { RecordIdMethod.DETERMINISTIC); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp(ATTRIBUTES))) - .setCoder(PubsubMessageWithAttributesCoder.of()) .apply(sink); p.run(); } @@ -145,7 +144,6 @@ public class PubsubUnboundedSinkTest implements Serializable { Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) .apply(sink); p.run(); } @@ -182,7 +180,6 @@ public class PubsubUnboundedSinkTest implements Serializable { RecordIdMethod.DETERMINISTIC); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) .apply(sink); p.run(); }
