Repository: beam Updated Branches: refs/heads/gearpump-runner 559e3c341 -> 99f4f8b1b
Return a valid Coder for any subtype of Mutation on HBaseCoderProviderRegistrar Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b6d20d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b6d20d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b6d20d9 Branch: refs/heads/gearpump-runner Commit: 6b6d20d9dc5afa0c1d8520cf6dbc98e6488a58a2 Parents: a40d11c Author: Ismaël MejÃa <[email protected]> Authored: Wed Jun 21 01:04:18 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Jun 21 07:29:51 2017 +0200 ---------------------------------------------------------------------- .../io/hbase/HBaseCoderProviderRegistrar.java | 11 +---- .../beam/sdk/io/hbase/HBaseMutationCoder.java | 42 ++++++++++++++++++++ .../hbase/HBaseCoderProviderRegistrarTest.java | 4 ++ 3 files changed, 47 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java index dee3c70..2973d1b 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java @@ -24,11 +24,6 @@ import org.apache.beam.sdk.coders.CoderProvider; import org.apache.beam.sdk.coders.CoderProviderRegistrar; import org.apache.beam.sdk.coders.CoderProviders; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; /** @@ -39,11 +34,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { @Override public List<CoderProvider> getCoderProviders() { return ImmutableList.of( - CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()), - CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()), + HBaseMutationCoder.getCoderProvider(), CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java index 501fe09..ee83114 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java @@ -21,8 +21,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -65,4 +69,42 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { throw new IllegalArgumentException("Only Put and Delete are supported"); } } + + /** + * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for + * {@link Mutation mutations}. + */ + static CoderProvider getCoderProvider() { + return HBASE_MUTATION_CODER_PROVIDER; + } + + private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER = + new HBaseMutationCoderProvider(); + + /** + * A {@link CoderProvider} for {@link Mutation mutations}. + */ + private static class HBaseMutationCoderProvider extends CoderProvider { + @Override + public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor, + List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException { + if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide %s because %s is not a subclass of %s", + HBaseMutationCoder.class.getSimpleName(), + typeDescriptor, + Mutation.class.getName())); + } + + try { + return (Coder<T>) HBaseMutationCoder.of(); + } catch (IllegalArgumentException e) { + throw new CannotProvideCoderException(e); + } + } + } + + private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR = + new TypeDescriptor<Mutation>() {}; } http://git-wip-us.apache.org/repos/asf/beam/blob/6b6d20d9/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java index ac81e8a..5b2e138 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.io.hbase; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,5 +39,7 @@ public class HBaseCoderProviderRegistrarTest { @Test public void testMutationCoderIsRegistered() throws Exception { CoderRegistry.createDefault().getCoder(Mutation.class); + CoderRegistry.createDefault().getCoder(Put.class); + CoderRegistry.createDefault().getCoder(Delete.class); } }
