Move ValueWithRecordId to sdk.values, annotated
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2553caf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2553caf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2553caf Branch: refs/heads/master Commit: b2553caf1350eaea3caefe55d5af414694c96424 Parents: e0b3f80 Author: Kenneth Knowles <[email protected]> Authored: Wed May 3 20:19:51 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 4 16:06:55 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/runners/spark/TestSparkRunner.java | 2 +- .../sdk/io/BoundedReadFromUnboundedSource.java | 2 +- .../apache/beam/sdk/util/ValueWithRecordId.java | 134 ------------------ .../beam/sdk/values/ValueWithRecordId.java | 138 +++++++++++++++++++ .../beam/sdk/util/ValueWithRecordIdTest.java | 34 ----- .../beam/sdk/values/ValueWithRecordIdTest.java | 34 +++++ 7 files changed, 175 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7123316..57da61b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -121,7 +121,6 @@ import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; @@ -132,6 +131,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 6808d7b..a6851c4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -48,11 +48,11 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.commons.io.FileUtils; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index e54176f..d9adf92 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -37,9 +37,9 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.NameUtils; -import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java deleted file mode 100644 index 9902aa7..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.base.MoreObjects; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * Immutable struct containing a value as well as a unique id identifying the value. - * - * @param <ValueT> the underlying value type - */ -public class ValueWithRecordId<ValueT> { - private final ValueT value; - private final byte[] id; - - public ValueWithRecordId(ValueT value, byte[] id) { - this.value = value; - this.id = id; - } - - public ValueT getValue() { - return value; - } - - public byte[] getId() { - return id; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .add("value", value) - .toString(); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof ValueWithRecordId)) { - return false; - } - ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other; - return Objects.deepEquals(id, otherRecord.id) - && Objects.deepEquals(value, otherRecord.value); - } - - @Override - public int hashCode() { - return Objects.hash(Arrays.hashCode(id), value); - } - - /** - * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. - */ - public static class ValueWithRecordIdCoder<ValueT> - extends CustomCoder<ValueWithRecordId<ValueT>> { - public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) { - return new ValueWithRecordIdCoder<>(valueCoder); - } - - protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) { - this.valueCoder = valueCoder; - this.idCoder = ByteArrayCoder.of(); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(valueCoder); - } - - @Override - public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context) - throws IOException { - valueCoder.encode(value.value, outStream, context.nested()); - idCoder.encode(value.id, outStream, context); - } - - @Override - public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context) - throws IOException { - return new ValueWithRecordId<ValueT>( - valueCoder.decode(inStream, context.nested()), - idCoder.decode(inStream, context)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - valueCoder.verifyDeterministic(); - } - - public Coder<ValueT> getValueCoder() { - return valueCoder; - } - - Coder<ValueT> valueCoder; - ByteArrayCoder idCoder; - } - - /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */ - public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().getValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java new file mode 100644 index 0000000..0d92f40 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * <b>For internal use only; no backwards compatibility guarantees.</b> + * + * <p>Immutable struct containing a value as well as a unique id identifying the value. + * + * @param <ValueT> the underlying value type + */ +@Internal +public class ValueWithRecordId<ValueT> { + private final ValueT value; + private final byte[] id; + + public ValueWithRecordId(ValueT value, byte[] id) { + this.value = value; + this.id = id; + } + + public ValueT getValue() { + return value; + } + + public byte[] getId() { + return id; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("value", value) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof ValueWithRecordId)) { + return false; + } + ValueWithRecordId<?> otherRecord = (ValueWithRecordId<?>) other; + return Objects.deepEquals(id, otherRecord.id) + && Objects.deepEquals(value, otherRecord.value); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(id), value); + } + + /** + * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. + */ + public static class ValueWithRecordIdCoder<ValueT> + extends CustomCoder<ValueWithRecordId<ValueT>> { + public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) { + return new ValueWithRecordIdCoder<>(valueCoder); + } + + protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) { + this.valueCoder = valueCoder; + this.idCoder = ByteArrayCoder.of(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(valueCoder); + } + + @Override + public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context) + throws IOException { + valueCoder.encode(value.value, outStream, context.nested()); + idCoder.encode(value.id, outStream, context); + } + + @Override + public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context) + throws IOException { + return new ValueWithRecordId<ValueT>( + valueCoder.decode(inStream, context.nested()), + idCoder.decode(inStream, context)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + valueCoder.verifyDeterministic(); + } + + public Coder<ValueT> getValueCoder() { + return valueCoder; + } + + Coder<ValueT> valueCoder; + ByteArrayCoder idCoder; + } + + /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */ + public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java deleted file mode 100644 index e3a2dc6..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.ValueWithRecordId.ValueWithRecordIdCoder; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link ValueWithRecordId}. */ -@RunWith(JUnit4.class) -public class ValueWithRecordIdTest { - @Test - public void testCoderIsSerializableWithWellKnownCoderType() { - CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2553caf/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java new file mode 100644 index 0000000..987c9af --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/ValueWithRecordIdTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ValueWithRecordId}. */ +@RunWith(JUnit4.class) +public class ValueWithRecordIdTest { + @Test + public void testCoderIsSerializableWithWellKnownCoderType() { + CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE)); + } +}
