[
https://issues.apache.org/jira/browse/BEAM-4863?focusedWorklogId=165998&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165998
]
ASF GitHub Bot logged work on BEAM-4863:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Nov/18 18:11
Start Date: 14/Nov/18 18:11
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #6057: [BEAM-4863]
Implement consistentWithEquals/structuralValue on FullWindowedValueCoder
URL: https://github.com/apache/beam/pull/6057
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index e6311381909..9724276d364 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -32,6 +32,17 @@
return new ListCoder<>(elemCoder);
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>List coders are consistent with equals if and only if the element
coder is consistent with
+ * equals.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return getElemCoder().consistentWithEquals();
+ }
+
/////////////////////////////////////////////////////////////////////////////
// Internal operations below here.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index 5afdb5e02e3..c4adbccfca5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -54,6 +54,17 @@ public void verifyDeterministic() throws
NonDeterministicException {
new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Set coders are consistent with equals if and only if the element coder
is consistent with
+ * equals.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return getElemCoder().consistentWithEquals();
+ }
+
/////////////////////////////////////////////////////////////////////////////
// Internal operations below here.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 53776d31a6c..c948eeddb60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -34,8 +34,8 @@
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -497,7 +497,7 @@ private void ensureWindowsAreASet() {
// right, and cast the window type away here.
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<Collection<? extends BoundedWindow>> collectionCoder =
- (Coder) CollectionCoder.of(this.windowCoder);
+ (Coder) ListCoder.of(this.windowCoder);
this.windowsCoder = collectionCoder;
}
@@ -564,6 +564,29 @@ public void registerByteSizeObserver(WindowedValue<T>
value, ElementByteSizeObse
valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>This coder is consistent with equals if the window and value coders
are consistent with
+ * equals.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return windowsCoder.consistentWithEquals() &&
valueCoder.consistentWithEquals();
+ }
+
+ @Override
+ public Object structuralValue(WindowedValue<T> value) {
+ if (value != null && consistentWithEquals()) {
+ return value;
+ }
+ return ImmutableList.of(
+ value.getTimestamp(),
+ valueCoder.structuralValue(value.getValue()),
+ windowsCoder.structuralValue(value.getWindows()),
+ value.getPane());
+ }
+
/**
* {@inheritDoc}.
*
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index 334ff6bac7d..8ff27f8ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -18,8 +18,11 @@
package org.apache.beam.sdk.coders;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -52,6 +55,24 @@ public void testCoderIsSerializableWithWellKnownCoderType()
throws Exception {
CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE));
}
+ @Test
+ public void testConsistentWithEquals() throws Exception {
+
assertFalse(CollectionCoder.of(ByteArrayCoder.of()).consistentWithEquals());
+ assertTrue(TEST_CODER.consistentWithEquals());
+ for (List<Integer> value : TEST_VALUES) {
+ CoderProperties.coderConsistentWithEquals(TEST_CODER, value,
ImmutableList.copyOf(value));
+ }
+ }
+
+ @Test
+ public void testStructuralValue() throws Exception {
+ for (List<Integer> value : TEST_VALUES) {
+ CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
+ CoderProperties.structuralValueConsistentWithEquals(
+ TEST_CODER, value, ImmutableList.copyOf(value));
+ }
+ }
+
@Test
public void testDecodeEncodeContentsInSameOrder() throws Exception {
for (List<Integer> value : TEST_VALUES) {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
index 07fe5d8083a..0411512debd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
@@ -18,8 +18,11 @@
package org.apache.beam.sdk.coders;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -52,6 +55,24 @@ public void testCoderIsSerializableWithWellKnownCoderType()
throws Exception {
CoderProperties.coderSerializable(SetCoder.of(GlobalWindow.Coder.INSTANCE));
}
+ @Test
+ public void testConsistentWithEquals() throws Exception {
+
assertFalse(CollectionCoder.of(ByteArrayCoder.of()).consistentWithEquals());
+ assertTrue(TEST_CODER.consistentWithEquals());
+ for (Set<Integer> value : TEST_VALUES) {
+ CoderProperties.coderConsistentWithEquals(TEST_CODER, value,
ImmutableSet.copyOf(value));
+ }
+ }
+
+ @Test
+ public void testStructuralValue() throws Exception {
+ for (Set<Integer> value : TEST_VALUES) {
+ CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
+ CoderProperties.structuralValueConsistentWithEquals(
+ TEST_CODER, value, ImmutableSet.copyOf(value));
+ }
+ }
+
@Test
public void testDecodeEncodeContentsEqual() throws Exception {
for (Set<Integer> value : TEST_VALUES) {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 9aa50f41e17..2e6e73ecd2b 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -20,12 +20,15 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -48,7 +51,7 @@
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
- public void testWindowedValueCoder() throws CoderException {
+ public void testWindowedValueCoder() throws Exception {
Instant timestamp = new Instant(1234);
WindowedValue<String> value =
WindowedValue.of(
@@ -69,6 +72,19 @@ public void testWindowedValueCoder() throws CoderException {
Assert.assertEquals(value.getValue(), decodedValue.getValue());
Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp());
Assert.assertArrayEquals(value.getWindows().toArray(),
decodedValue.getWindows().toArray());
+
+ WindowedValue<String> value2 =
+ WindowedValue.of(
+ "abc",
+ new Instant(1234),
+ Arrays.asList(
+ new IntervalWindow(timestamp, timestamp.plus(1000)),
+ new IntervalWindow(timestamp.plus(1000),
timestamp.plus(2000))),
+ PaneInfo.NO_FIRING);
+ assertTrue(windowedValueCoder.consistentWithEquals());
+ CoderProperties.coderConsistentWithEquals(windowedValueCoder, value,
value2);
+ CoderProperties.structuralValueConsistentWithEquals(windowedValueCoder,
value, value2);
+ CoderProperties.structuralValueDecodeEncodeEqual(windowedValueCoder,
value);
}
@Test
@@ -98,6 +114,52 @@ public void testExplodeWindowsInOneWindowEquals() {
assertThat(Iterables.getOnlyElement(value.explodeWindows()),
equalTo(value));
}
+ @Test
+ public void testGlobalWindowedValueEquality() {
+ Set<WindowedValue<String>> equivalentValues =
+ ImmutableSet.of(
+ WindowedValue.valueInGlobalWindow("testValue"),
+ WindowedValue.valueInGlobalWindow("testValue", PaneInfo.NO_FIRING),
+ WindowedValue.timestampedValueInGlobalWindow(
+ "testValue", BoundedWindow.TIMESTAMP_MIN_VALUE),
+ WindowedValue.of(
+ "testValue",
+ BoundedWindow.TIMESTAMP_MIN_VALUE,
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "testValue",
+ BoundedWindow.TIMESTAMP_MIN_VALUE,
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ PaneInfo.NO_FIRING));
+ for (WindowedValue<String> first : equivalentValues) {
+ for (WindowedValue<String> second : equivalentValues) {
+ assertThat(first, equalTo(second));
+ }
+ }
+ }
+
+ @Test
+ public void testWindowedValueEquality() {
+ Set<WindowedValue<String>> equivalentValues =
+ ImmutableSet.of(
+ WindowedValue.of(
+ "testValue",
+ new Instant(100L),
+ new IntervalWindow(new Instant(200L), new Instant(300L)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ WindowedValue.of(
+ "testValue",
+ new Instant(100L),
+ Collections.singletonList(new IntervalWindow(new
Instant(200L), new Instant(300L))),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ for (WindowedValue<String> first : equivalentValues) {
+ for (WindowedValue<String> second : equivalentValues) {
+ assertThat(first, equalTo(second));
+ }
+ }
+ }
+
@Test
public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
Instant now = Instant.now();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 165998)
Time Spent: 1h 50m (was: 1h 40m)
> Implement consistentWithEquals/structuralValue on FullWindowedValueCoder
> ------------------------------------------------------------------------
>
> Key: BEAM-4863
> URL: https://issues.apache.org/jira/browse/BEAM-4863
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Implementing *consistentWithEquals*/*structuralValue* boosts significantly
> the performance of using these values in comparison operations since it
> doesn't require encoding the values.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)