[BEAM-1540] Move coders package classes to the top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c857c7e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c857c7e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c857c7e Branch: refs/heads/master Commit: 0c857c7e9efe83715111d8f984d09b5d9697448f Parents: 8341924 Author: Ismaël MejÃa <[email protected]> Authored: Thu Feb 23 09:40:58 2017 +0100 Committer: Dan Halperin <[email protected]> Committed: Mon Feb 27 13:13:03 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 3 - .../beam/sdk/io/hbase/HBaseMutationCoder.java | 71 ++++++++++++++++++++ .../beam/sdk/io/hbase/HBaseResultCoder.java | 54 +++++++++++++++ .../beam/sdk/io/hbase/SerializableScan.java | 49 ++++++++++++++ .../sdk/io/hbase/coders/HBaseMutationCoder.java | 71 -------------------- .../sdk/io/hbase/coders/HBaseResultCoder.java | 54 --------------- .../sdk/io/hbase/coders/SerializableScan.java | 49 -------------- .../beam/sdk/io/hbase/coders/package-info.java | 24 ------- 8 files changed, 174 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 75f5615..3c49db6 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -40,9 +40,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.io.hbase.coders.HBaseMutationCoder; -import org.apache.beam.sdk.io.hbase.coders.HBaseResultCoder; -import org.apache.beam.sdk.io.hbase.coders.SerializableScan; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java new file mode 100644 index 0000000..356abc4 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +/** + * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link + * ProtobufUtil}. + */ +public class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { + private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder(); + + private HBaseMutationCoder() {} + + public static HBaseMutationCoder of() { + return INSTANCE; + } + + @Override + public void encode(Mutation mutation, OutputStream outStream, + Coder.Context context) throws IOException { + MutationType type = getType(mutation); + MutationProto proto = ProtobufUtil.toMutation(type, mutation); + proto.writeDelimitedTo(outStream); + } + + @Override + public Mutation decode(InputStream inStream, + Coder.Context context) throws IOException { + return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); + } + + private static MutationType getType(Mutation mutation) { + if (mutation instanceof Put) { + return MutationType.PUT; + } else if (mutation instanceof Delete) { + return MutationType.DELETE; + } else { + // Increment and Append are not idempotent. They should not be used in distributed jobs. + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java new file mode 100644 index 0000000..8e5e128 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link + * ProtobufUtil}. + */ +public class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { + + private static final HBaseResultCoder INSTANCE = new HBaseResultCoder(); + + public static HBaseResultCoder of() { + return INSTANCE; + } + + @Override + public Result decode(InputStream inputStream, Coder.Context context) + throws IOException { + return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); + } + + @Override + public void encode(Result value, OutputStream outputStream, Coder.Context context) + throws IOException { + ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java new file mode 100644 index 0000000..ed2ec9e --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * This is just a wrapper class to serialize HBase {@link Scan}. + */ +public class SerializableScan implements Serializable { + private transient Scan scan; + + public SerializableScan(Scan scan) { + this.scan = scan; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + ProtobufUtil.toScan(scan).writeDelimitedTo(out); + } + + private void readObject(ObjectInputStream in) throws IOException { + scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); + } + + public Scan getScan() { + return scan; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java deleted file mode 100644 index a99a943..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hbase.coders; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; - -/** - * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link - * ProtobufUtil}. - */ -public class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { - private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder(); - - private HBaseMutationCoder() {} - - public static HBaseMutationCoder of() { - return INSTANCE; - } - - @Override - public void encode(Mutation mutation, OutputStream outStream, - Coder.Context context) throws IOException { - MutationType type = getType(mutation); - MutationProto proto = ProtobufUtil.toMutation(type, mutation); - proto.writeDelimitedTo(outStream); - } - - @Override - public Mutation decode(InputStream inStream, - Coder.Context context) throws IOException { - return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); - } - - private static MutationType getType(Mutation mutation) { - if (mutation instanceof Put) { - return MutationType.PUT; - } else if (mutation instanceof Delete) { - return MutationType.DELETE; - } else { - // Increment and Append are not idempotent. They should not be used in distributed jobs. - throw new IllegalArgumentException("Only Put and Delete are supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java deleted file mode 100644 index f10a517..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hbase.coders; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; - -/** - * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link - * ProtobufUtil}. - */ -public class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { - - private static final HBaseResultCoder INSTANCE = new HBaseResultCoder(); - - public static HBaseResultCoder of() { - return INSTANCE; - } - - @Override - public Result decode(InputStream inputStream, Coder.Context context) - throws IOException { - return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); - } - - @Override - public void encode(Result value, OutputStream outputStream, Coder.Context context) - throws IOException { - ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java deleted file mode 100644 index 96beff9..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hbase.coders; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; - -/** - * This is just a wrapper class to serialize HBase {@link Scan}. - */ -public class SerializableScan implements Serializable { - private transient Scan scan; - - public SerializableScan(Scan scan) { - this.scan = scan; - } - - private void writeObject(ObjectOutputStream out) throws IOException { - ProtobufUtil.toScan(scan).writeDelimitedTo(out); - } - - private void readObject(ObjectInputStream in) throws IOException { - scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); - } - - public Scan getScan() { - return scan; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0c857c7e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java deleted file mode 100644 index d21b927..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Defines coders used while reading and writing from/to HBase. - * - * @see org.apache.beam.sdk.io.hbase.HBaseIO - */ -package org.apache.beam.sdk.io.hbase.coders;
