Adds coders for boolean, ResourceId and Metadata
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e43b238 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e43b238 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e43b238 Branch: refs/heads/master Commit: 5e43b2388652f38a37ab3378a63ae88e6ad53ee3 Parents: 8d337ff Author: Eugene Kirpichov <[email protected]> Authored: Thu Aug 3 14:42:07 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Aug 4 16:38:23 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BooleanCoder.java | 59 ++++++++++++++++++ .../apache/beam/sdk/coders/CoderRegistry.java | 10 ++++ .../apache/beam/sdk/io/fs/MetadataCoder.java | 63 ++++++++++++++++++++ .../apache/beam/sdk/io/fs/ResourceIdCoder.java | 56 +++++++++++++++++ .../org/apache/beam/sdk/transforms/Watch.java | 7 ++- 5 files changed, 192 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java new file mode 100644 index 0000000..e7f7543 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** A {@link Coder} for {@link Boolean}. */ +public class BooleanCoder extends AtomicCoder<Boolean> { + private static final ByteCoder BYTE_CODER = ByteCoder.of(); + + private static final BooleanCoder INSTANCE = new BooleanCoder(); + + /** Returns the singleton instance of {@link BooleanCoder}. */ + public static BooleanCoder of() { + return INSTANCE; + } + + @Override + public void encode(Boolean value, OutputStream os) throws IOException { + BYTE_CODER.encode(value ? (byte) 1 : 0, os); + } + + @Override + public Boolean decode(InputStream is) throws IOException { + return BYTE_CODER.decode(is) == 1; + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Boolean value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Boolean value) throws Exception { + return 1; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index 48389b1..c335bda 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -43,6 +43,10 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MetadataCoder; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdCoder; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CoderUtils; @@ -89,6 +93,8 @@ public class CoderRegistry { private CommonTypes() { ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder(); + builder.put(Boolean.class, + CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class)); builder.put(Byte.class, CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class)); builder.put(BitSet.class, @@ -109,6 +115,10 @@ public class CoderRegistry { CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class)); builder.put(Map.class, CoderProviders.fromStaticMethods(Map.class, MapCoder.class)); + builder.put(Metadata.class, + CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class)); + builder.put(ResourceId.class, + CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class)); builder.put(Set.class, CoderProviders.fromStaticMethods(Set.class, SetCoder.class)); builder.put(String.class, http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java new file mode 100644 index 0000000..5c9c4d7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; + +/** A {@link Coder} for {@link Metadata}. */ +public class MetadataCoder extends AtomicCoder<Metadata> { + private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); + private static final VarIntCoder INT_CODER = VarIntCoder.of(); + private static final VarLongCoder LONG_CODER = VarLongCoder.of(); + + /** Creates a {@link MetadataCoder}. */ + public static MetadataCoder of() { + return new MetadataCoder(); + } + + @Override + public void encode(Metadata value, OutputStream os) throws IOException { + RESOURCE_ID_CODER.encode(value.resourceId(), os); + INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os); + LONG_CODER.encode(value.sizeBytes(), os); + } + + @Override + public Metadata decode(InputStream is) throws IOException { + ResourceId resourceId = RESOURCE_ID_CODER.decode(is); + boolean isReadSeekEfficient = INT_CODER.decode(is) == 1; + long sizeBytes = LONG_CODER.decode(is); + return Metadata.builder() + .setResourceId(resourceId) + .setIsReadSeekEfficient(isReadSeekEfficient) + .setSizeBytes(sizeBytes) + .build(); + } + + @Override + public boolean consistentWithEquals() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java new file mode 100644 index 0000000..d7649c0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileSystems; + +/** A {@link Coder} for {@link ResourceId}. */ +public class ResourceIdCoder extends AtomicCoder<ResourceId> { + private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + private static final Coder<Boolean> BOOL_CODER = BooleanCoder.of(); + + /** Creates a {@link ResourceIdCoder}. */ + public static ResourceIdCoder of() { + return new ResourceIdCoder(); + } + + @Override + public void encode(ResourceId value, OutputStream os) throws IOException { + STRING_CODER.encode(value.toString(), os); + BOOL_CODER.encode(value.isDirectory(), os); + } + + @Override + public ResourceId decode(InputStream is) throws IOException { + String spec = STRING_CODER.decode(is); + boolean isDirectory = BOOL_CODER.decode(is); + return FileSystems.matchNewResource(spec, isDirectory); + } + + @Override + public boolean consistentWithEquals() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index b21eb62..fc6f18d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -47,6 +47,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DurationCoder; @@ -958,7 +959,7 @@ public class Watch { return new GrowthStateCoder<>(outputCoder, terminationStateCoder); } - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of(); private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of()); private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of(); @@ -980,7 +981,7 @@ public class Watch { throws IOException { completedCoder.encode(value.completed, os); pendingCoder.encode(value.pending, os); - INT_CODER.encode(value.isOutputComplete ? 1 : 0, os); + BOOLEAN_CODER.encode(value.isOutputComplete, os); terminationStateCoder.encode(value.terminationState, os); INSTANT_CODER.encode(value.pollWatermark, os); } @@ -989,7 +990,7 @@ public class Watch { public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException { Map<HashCode, Instant> completed = completedCoder.decode(is); List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is); - boolean isOutputComplete = (INT_CODER.decode(is) == 1); + boolean isOutputComplete = BOOLEAN_CODER.decode(is); TerminationStateT terminationState = terminationStateCoder.decode(is); Instant pollWatermark = INSTANT_CODER.decode(is); return new GrowthState<>(
