[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d42f6333 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d42f6333 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d42f6333 Branch: refs/heads/master Commit: d42f6333141e85964d009110d8bea85ad4763632 Parents: 1ec59a0 Author: Ismaël MejÃa <[email protected]> Authored: Tue Jun 20 10:00:15 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Tue Jun 20 10:00:15 2017 +0200 ---------------------------------------------------------------------- sdks/java/io/hbase/pom.xml | 6 +++ .../io/hbase/HBaseCoderProviderRegistrar.java | 49 ++++++++++++++++++++ .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 3 -- .../hbase/HBaseCoderProviderRegistrarTest.java | 41 ++++++++++++++++ .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 9 ++-- 5 files changed, 100 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml index f81cd24..4d9d600 100644 --- a/sdks/java/io/hbase/pom.xml +++ b/sdks/java/io/hbase/pom.xml @@ -64,6 +64,12 @@ </dependency> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-shaded-client</artifactId> <version>${hbase.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java new file mode 100644 index 0000000..dee3c70 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviderRegistrar; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; + +/** + * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. + */ +@AutoService(CoderProviderRegistrar.class) +public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { + @Override + public List<CoderProvider> getCoderProviders() { + return ImmutableList.of( + CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()), + CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 626fad9..c9afe89 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory; * <pre>{@code * Configuration configuration = ...; * PCollection<Mutation> data = ...; - * data.setCoder(HBaseIO.WRITE_CODER); * * data.apply("write", * HBaseIO.write() @@ -666,6 +665,4 @@ public class HBaseIO { private long recordsWritten; } } - - public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java new file mode 100644 index 0000000..ac81e8a --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HBaseCoderProviderRegistrar}. + */ +@RunWith(JUnit4.class) +public class HBaseCoderProviderRegistrarTest { + @Test + public void testResultCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getCoder(Result.class); + } + + @Test + public void testMutationCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getCoder(Mutation.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index d081139..806a27f 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -295,8 +295,7 @@ public class HBaseIOTest { createTable(table); - p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)) - .withCoder(HBaseIO.WRITE_CODER)) + p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))) .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); p.run().waitUntilFinish(); @@ -309,7 +308,7 @@ public class HBaseIOTest { public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE-DOES-NOT-EXIST"; - p.apply(Create.empty(HBaseIO.WRITE_CODER)) + p.apply(Create.empty(HBaseMutationCoder.of())) .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); // Exception will be thrown by write.validate() when write is applied. @@ -325,8 +324,8 @@ public class HBaseIOTest { final String key = "KEY"; createTable(table); - p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER)) - .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); + p.apply(Create.of(makeBadMutation(key))) + .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); thrown.expect(Pipeline.PipelineExecutionException.class); thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
