This is an automated email from the ASF dual-hosted git repository.
ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 399d9d7ea6b Introduce ValueKind to Java and add to WindowedValue
(#38315)
399d9d7ea6b is described below
commit 399d9d7ea6b510195741049523cda19eb94c1569
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon May 11 10:42:09 2026 -0400
Introduce ValueKind to Java and add to WindowedValue (#38315)
* introduce ValueKind to Java and add to WindowedValue
* map UNSPECIFIED to INSERT; cleanup
* rebase remnants
* adjust equals and hashcode methods to include valuekind
* use existing coder test
* compile fixes
* compile fixes in dataflow
* compile fixes in dataflow
* add test to WindmillKeyedWorkItem
* compile fixes
---
.../runners/core/LateDataDroppingDoFnRunner.java | 3 +-
...TimeBoundedSplittableProcessElementInvoker.java | 6 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 3 +-
.../beam/runners/dataflow/BatchViewOverrides.java | 6 +
.../dataflow/worker/UngroupedWindmillReader.java | 10 +-
.../dataflow/worker/WindmillKeyedWorkItem.java | 14 +-
.../dataflow/worker/util/ValueInEmptyWindows.java | 6 +
.../dataflow/worker/WindmillKeyedWorkItemTest.java | 43 ++++
.../apache/beam/runners/spark/util/TimerUtils.java | 6 +
.../org/apache/beam/sdk/values/OutputBuilder.java | 2 +
.../beam/sdk/values/ValueInSingleWindow.java | 52 ++++-
.../java/org/apache/beam/sdk/values/ValueKind.java | 40 ++++
.../org/apache/beam/sdk/values/ValueKindUtil.java | 54 +++++
.../org/apache/beam/sdk/values/WindowedValue.java | 4 +
.../org/apache/beam/sdk/values/WindowedValues.java | 230 ++++++++++++++++-----
.../apache/beam/sdk/util/WindowedValueTest.java | 5 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 3 +-
17 files changed, 428 insertions(+), 59 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 2ce94533d9e..41052a76f13 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -152,7 +152,8 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT,
W extends BoundedWin
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
- element.getOpenTelemetryContext()));
+ element.getOpenTelemetryContext(),
+ element.getValueKind()));
}
}
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index dc84b15c89f..ebd88442b21 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -452,7 +452,8 @@ public class
OutputAndTimeBoundedSplittableProcessElementInvoker<
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
- element.getOpenTelemetryContext()));
+ element.getOpenTelemetryContext(),
+ element.getValueKind()));
}
@Override
@@ -476,7 +477,8 @@ public class
OutputAndTimeBoundedSplittableProcessElementInvoker<
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
- element.getOpenTelemetryContext()));
+ element.getOpenTelemetryContext(),
+ element.getValueKind()));
}
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 7a0f984479a..424ea567115 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -473,7 +473,8 @@ public class SplittableParDoViaKeyedWorkItems {
read.getRecordId(),
read.getRecordOffset(),
CausedByDrain.CAUSED_BY_DRAIN,
- read.getOpenTelemetryContext());
+ read.getOpenTelemetryContext(),
+ read.getValueKind());
}
elementAndRestriction = KV.of(read, restrictionState.read());
watermarkEstimatorStateT = watermarkEstimatorState.read();
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index e5ac4e99285..b15b3f3834d 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -74,6 +74,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -1415,6 +1416,11 @@ class BatchViewOverrides {
return null;
}
+ @Override
+ public ValueKind getValueKind() {
+ return ValueKind.INSERT;
+ }
+
@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index 8ef0bf80323..0b883c04846 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueKind;
+import org.apache.beam.sdk.values.ValueKindUtil;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -139,6 +141,7 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
* drain happened upstream
*/
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+ ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
@@ -146,6 +149,7 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -164,7 +168,8 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
null,
null,
drainingValueFromUpstream,
- null);
+ null,
+ valueKind);
} else {
notifyElementRead(data.available() + metadata.available());
// todo #37030 parse context from previous stage
@@ -176,7 +181,8 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
null,
null,
drainingValueFromUpstream,
- null);
+ null,
+ valueKind);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 4c968030eef..033e4186158 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -42,6 +42,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
+import org.apache.beam.sdk.values.ValueKindUtil;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
@@ -148,6 +150,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
* drain happened upstream
*/
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+ ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
@@ -155,11 +158,20 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
return WindowedValues.of(
- value, timestamp, windows, paneInfo, null, null,
drainingValueFromUpstream, null);
+ value,
+ timestamp,
+ windows,
+ paneInfo,
+ null,
+ null,
+ drainingValueFromUpstream,
+ null,
+ valueKind);
} catch (RuntimeException | IOException e) {
if (!skipUndecodableElements) {
throw new RuntimeException(e);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
index 67ffdc9f3b8..d5a493577f4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -71,6 +72,11 @@ public class ValueInEmptyWindows<T> implements
WindowedValue<T> {
return null;
}
+ @Override
+ public ValueKind getValueKind() {
+ return ValueKind.INSERT;
+ }
+
@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index c1568058435..e6738246b53 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -46,6 +46,7 @@ import
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -374,6 +375,48 @@ public class WindmillKeyedWorkItemTest {
assertThat(
keyedWorkItem.timersIterable(),
Matchers.contains(makeDrainingTimer(STATE_NAMESPACE_2, 3,
TimeDomain.EVENT_TIME)));
+ WindowedValues.WindowedValueCoder.setMetadataNotSupported();
+ }
+
+ @Test
+ public void testValueKindPropagated() throws Exception {
+ WindowedValues.WindowedValueCoder.setMetadataSupported();
+ Windmill.WorkItem.Builder workItem =
+ Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+ Windmill.InputMessageBundle.Builder chunk1 =
workItem.addMessageBundlesBuilder();
+ chunk1.setSourceComputationId("computation");
+ addElementWithMetadata(
+ chunk1,
+ 5,
+ "hello",
+ WINDOW_1,
+ paneInfo(0),
+ BeamFnApi.Elements.ElementMetadata.newBuilder()
+ .setValueKind(BeamFnApi.Elements.ValueKind.Enum.UPDATE_AFTER)
+ .build());
+ addElementWithMetadata(
+ chunk1,
+ 7,
+ "world",
+ WINDOW_2,
+ paneInfo(2),
+ BeamFnApi.Elements.ElementMetadata.newBuilder()
+ .setValueKind(BeamFnApi.Elements.ValueKind.Enum.DELETE)
+ .build());
+ KeyedWorkItem<String, String> keyedWorkItem =
+ new WindmillKeyedWorkItem<>(
+ KEY,
+ workItem.build(),
+ WINDOW_CODER,
+ WINDOWS_CODER,
+ VALUE_CODER,
+ windmillTagEncoding,
+ true);
+
+ Iterator<WindowedValue<String>> iterator =
keyedWorkItem.elementsIterable().iterator();
+ Assert.assertEquals(ValueKind.UPDATE_AFTER,
iterator.next().getValueKind());
+ Assert.assertEquals(ValueKind.DELETE, iterator.next().getValueKind());
+ WindowedValues.WindowedValueCoder.setMetadataNotSupported();
}
private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
index b6dda82239c..2111867d385 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
@@ -127,6 +128,11 @@ public class TimerUtils {
return CausedByDrain.NORMAL;
}
+ @Override
+ public ValueKind getValueKind() {
+ return ValueKind.INSERT;
+ }
+
@Override
public @Nullable Long getRecordOffset() {
return null;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
index ea73a5c35d7..c0894240572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
@@ -53,5 +53,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {
OutputBuilder<T> setOpenTelemetryContext(@Nullable Context
openTelemetryContext);
+ OutputBuilder<T> setValueKind(ValueKind valueKind);
+
void output();
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index afd5aa84520..ef7b40aabe4 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -71,6 +71,8 @@ public abstract class ValueInSingleWindow<T> {
public abstract @Nullable Context getOpenTelemetryContext();
+ public abstract ValueKind getValueKind();
+
// todo #33176 specify additional metadata in the future
public static <T> ValueInSingleWindow<T> of(
T value,
@@ -81,6 +83,28 @@ public abstract class ValueInSingleWindow<T> {
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
@Nullable Context openTelemetryContext) {
+ return of(
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ openTelemetryContext,
+ ValueKind.INSERT);
+ }
+
+ public static <T> ValueInSingleWindow<T> of(
+ T value,
+ Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ @Nullable String currentRecordId,
+ @Nullable Long currentRecordOffset,
+ CausedByDrain causedByDrain,
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
return new AutoValue_ValueInSingleWindow<>(
value,
timestamp,
@@ -89,12 +113,22 @@ public abstract class ValueInSingleWindow<T> {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
}
public static <T> ValueInSingleWindow<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
- return of(value, timestamp, window, paneInfo, null, null,
CausedByDrain.NORMAL, null);
+ return of(
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ ValueKind.INSERT);
}
/** A coder for {@link ValueInSingleWindow}. */
@@ -144,9 +178,9 @@ public abstract class ValueInSingleWindow<T> {
io.opentelemetry.context.Context openTelemetryContext =
windowedElem.getOpenTelemetryContext();
if (openTelemetryContext != null) {
-
OpenTelemetryContextPropagator.set(openTelemetryContext, builder);
}
+
builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()));
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
@@ -168,6 +202,7 @@ public abstract class ValueInSingleWindow<T> {
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
io.opentelemetry.context.@Nullable Context openTelemetryContext = null;
+ ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported() &&
paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -176,12 +211,21 @@ public abstract class ValueInSingleWindow<T> {
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
openTelemetryContext =
OpenTelemetryContextPropagator.read(elementMetadata);
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
T value = valueCoder.decode(inStream, context);
// todo #33176 specify additional metadata in the future
return new AutoValue_ValueInSingleWindow<>(
- value, timestamp, window, paneInfo, null, null, causedByDrain,
openTelemetryContext);
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ null,
+ null,
+ causedByDrain,
+ openTelemetryContext,
+ valueKind);
}
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
new file mode 100644
index 00000000000..7a190496ca2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+/** The type of change operation represented by a Change Data Capture (CDC)
record. */
+public enum ValueKind {
+ /** Indicates a new record was created in the source system. */
+ INSERT,
+
+ /**
+ * Indicates the state of a record immediately <b>before</b> an update
occurred. This is typically
+ * used to identify the previous values of modified columns or to locate the
record via its
+ * primary key.
+ */
+ UPDATE_BEFORE,
+
+ /**
+ * Indicates the state of a record immediately <b>after</b> an update
occurred. Represents the
+ * current, valid state of the record following the change.
+ */
+ UPDATE_AFTER,
+
+ /** Indicates that an existing record was removed from the source system. */
+ DELETE
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
new file mode 100644
index 00000000000..46ceb8f6247
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+
+/** Utility class for converting between {@link ValueKind} and {@link
Elements.ValueKind.Enum}. */
+public final class ValueKindUtil {
+ public static Elements.ValueKind.Enum toProto(ValueKind valueKind) {
+ switch (valueKind) {
+ case INSERT:
+ return Elements.ValueKind.Enum.INSERT;
+ case UPDATE_BEFORE:
+ return Elements.ValueKind.Enum.UPDATE_BEFORE;
+ case UPDATE_AFTER:
+ return Elements.ValueKind.Enum.UPDATE_AFTER;
+ case DELETE:
+ return Elements.ValueKind.Enum.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown ValueKind: " + valueKind);
+ }
+ }
+
+ public static ValueKind fromProto(Elements.ValueKind.Enum proto) {
+ switch (proto) {
+ case VALUE_KIND_UNSPECIFIED:
+ case INSERT:
+ return ValueKind.INSERT;
+ case UPDATE_BEFORE:
+ return ValueKind.UPDATE_BEFORE;
+ case UPDATE_AFTER:
+ return ValueKind.UPDATE_AFTER;
+ case DELETE:
+ return ValueKind.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown ValueKind: " + proto);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
index 3bf51b26630..13796018d7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
@@ -58,6 +58,10 @@ public interface WindowedValue<T> {
CausedByDrain causedByDrain();
+ /** Returns the {@link ValueKind} associated with this WindowedValue. */
+ @Pure
+ ValueKind getValueKind();
+
/**
* A representation of each of the actual values represented by this
compressed {@link
* WindowedValue}, one per window.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
index 58595875fd3..33700a9dc0d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
@@ -83,7 +83,8 @@ public class WindowedValues {
.setRecordOffset(template.getRecordOffset())
.setRecordId(template.getRecordId())
.setCausedByDrain(template.causedByDrain())
- .setOpenTelemetryContext(template.getOpenTelemetryContext());
+ .setOpenTelemetryContext(template.getOpenTelemetryContext())
+ .setValueKind(template.getValueKind());
}
public static class Builder<T> implements OutputBuilder<T> {
@@ -107,6 +108,7 @@ public class WindowedValues {
private @Nullable Long recordOffset;
private CausedByDrain causedByDrain = CausedByDrain.NORMAL;
private @Nullable Context openTelemetryContext;
+ private ValueKind valueKind = ValueKind.INSERT;
@Override
public Builder<T> setValue(T value) {
@@ -163,6 +165,13 @@ public class WindowedValues {
return this;
}
+ @Override
+ public Builder<T> setValueKind(ValueKind valueKind) {
+ checkStateNotNull(valueKind, "ValueKind is null");
+ this.valueKind = valueKind;
+ return this;
+ }
+
public Builder<T> setReceiver(WindowedValueReceiver<T> receiver) {
this.receiver = receiver;
return this;
@@ -222,6 +231,12 @@ public class WindowedValues {
return causedByDrain;
}
+ @Override
+ public ValueKind getValueKind() {
+ checkStateNotNull(valueKind, "ValueKind not set");
+ return valueKind;
+ }
+
@Override
public Collection<Builder<T>> explodeWindows() {
throw new UnsupportedOperationException(
@@ -258,7 +273,8 @@ public class WindowedValues {
getRecordId(),
getRecordOffset(),
causedByDrain(),
- getOpenTelemetryContext());
+ getOpenTelemetryContext(),
+ getValueKind());
}
@Override
@@ -269,6 +285,7 @@ public class WindowedValues {
.add("windows", getWindows())
.add("paneInfo", getPaneInfo())
.add("causedByDrain", causedByDrain())
+ .add("valueKind", getValueKind())
.add("receiver", receiver)
.toString();
}
@@ -276,7 +293,16 @@ public class WindowedValues {
public static <T> WindowedValue<T> of(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
- return of(value, timestamp, windows, paneInfo, null, null,
CausedByDrain.NORMAL, null);
+ return of(
+ value,
+ timestamp,
+ windows,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ ValueKind.INSERT);
}
/** Returns a {@code WindowedValue} with the given value, timestamp, and
windows. */
@@ -288,10 +314,12 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context openTelemetryContext) {
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it
was null");
checkArgument(windows.size() > 0, "WindowedValue requires windows, but
there were none");
checkArgument(causedByDrain != null, "WindowedValue requires
CausedByDrain, but it was null");
+ checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it
was null");
if (windows.size() == 1) {
return of(
value,
@@ -301,7 +329,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
} else {
return new TimestampedValueInMultipleWindows<>(
value,
@@ -311,7 +340,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
}
}
@@ -323,7 +353,8 @@ public class WindowedValues {
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
CausedByDrain causedByDrain,
- @Nullable Context openTelemetryContext) {
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
if (windows.size() == 1) {
return of(
value,
@@ -333,10 +364,19 @@ public class WindowedValues {
null,
null,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
} else {
return new TimestampedValueInMultipleWindows<>(
- value, timestamp, windows, paneInfo, null, null, causedByDrain,
openTelemetryContext);
+ value,
+ timestamp,
+ windows,
+ paneInfo,
+ null,
+ null,
+ causedByDrain,
+ openTelemetryContext,
+ valueKind);
}
}
@@ -345,7 +385,16 @@ public class WindowedValues {
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it
was null");
- return of(value, timestamp, window, paneInfo, null, null,
CausedByDrain.NORMAL, null);
+ return of(
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ ValueKind.INSERT);
}
/** Returns a {@code WindowedValue} with the given value, timestamp, and
window. */
@@ -357,9 +406,11 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context openTelemetryContext) {
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it
was null");
checkArgument(causedByDrain != null, "WindowedValue requires
CausedByDrain, but it was null");
+ checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it
was null");
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
@@ -369,7 +420,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
} else if (isGlobal) {
return new TimestampedValueInGlobalWindow<>(
value,
@@ -378,7 +430,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
} else {
return new TimestampedValueInSingleWindow<>(
value,
@@ -388,7 +441,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
}
}
@@ -398,7 +452,7 @@ public class WindowedValues {
*/
public static <T> WindowedValue<T> valueInGlobalWindow(T value) {
return new ValueInGlobalWindow<>(
- value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, null);
+ value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, null,
ValueKind.INSERT);
}
/**
@@ -406,7 +460,8 @@ public class WindowedValues {
* default timestamp and the specified pane.
*/
public static <T> WindowedValue<T> valueInGlobalWindow(T value, PaneInfo
paneInfo) {
- return new ValueInGlobalWindow<>(value, paneInfo, null, null,
CausedByDrain.NORMAL, null);
+ return new ValueInGlobalWindow<>(
+ value, paneInfo, null, null, CausedByDrain.NORMAL, null,
ValueKind.INSERT);
}
/**
@@ -418,7 +473,14 @@ public class WindowedValues {
return valueInGlobalWindow(value);
} else {
return new TimestampedValueInGlobalWindow<>(
- value, timestamp, PaneInfo.NO_FIRING, null, null,
CausedByDrain.NORMAL, null);
+ value,
+ timestamp,
+ PaneInfo.NO_FIRING,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ ValueKind.INSERT);
}
}
@@ -432,7 +494,7 @@ public class WindowedValues {
return timestampedValueInGlobalWindow(value, timestamp);
} else {
return new TimestampedValueInGlobalWindow<>(
- value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, null);
+ value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, null,
ValueKind.INSERT);
}
}
@@ -450,7 +512,8 @@ public class WindowedValues {
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
windowedValue.causedByDrain(),
- windowedValue.getOpenTelemetryContext());
+ windowedValue.getOpenTelemetryContext(),
+ windowedValue.getValueKind());
}
public static <T> boolean equals(
@@ -469,7 +532,8 @@ public class WindowedValues {
return left.getTimestamp().isEqual(right.getTimestamp())
&& Objects.equals(left.getValue(), right.getValue())
&& Iterables.elementsEqual(left.getWindows(), right.getWindows())
- && Objects.equals(left.getPaneInfo(), right.getPaneInfo());
+ && Objects.equals(left.getPaneInfo(), right.getPaneInfo())
+ && Objects.equals(left.getValueKind(), right.getValueKind());
}
public static <T> int hashCode(WindowedValue<T> windowedValue) {
@@ -478,7 +542,8 @@ public class WindowedValues {
windowedValue.getValue(),
windowedValue.getTimestamp().getMillis(),
windowedValue.getWindows(),
- windowedValue.getPaneInfo());
+ windowedValue.getPaneInfo(),
+ windowedValue.getValueKind());
}
private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS =
@@ -503,6 +568,7 @@ public class WindowedValues {
private final @Nullable Long currentRecordOffset;
private final CausedByDrain causedByDrain;
private final @Nullable Context context;
+ private final ValueKind valueKind;
@Override
public @Nullable String getRecordId() {
@@ -519,19 +585,26 @@ public class WindowedValues {
return causedByDrain;
}
+ @Override
+ public ValueKind getValueKind() {
+ return valueKind;
+ }
+
protected SimpleWindowedValue(
T value,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context context) {
+ @Nullable Context context,
+ ValueKind valueKind) {
this.value = value;
this.paneInfo = checkNotNull(paneInfo);
this.currentRecordId = currentRecordId;
this.currentRecordOffset = currentRecordOffset;
this.causedByDrain = causedByDrain;
this.context = context;
+ this.valueKind = checkNotNull(valueKind, "ValueKind is null");
}
@Override
@@ -586,8 +659,9 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context context) {
- super(value, pane, currentRecordId, currentRecordOffset, causedByDrain,
context);
+ @Nullable Context context,
+ ValueKind valueKind) {
+ super(value, pane, currentRecordId, currentRecordOffset, causedByDrain,
context, valueKind);
}
@Override
@@ -606,8 +680,10 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context context) {
- super(value, paneInfo, currentRecordId, currentRecordOffset,
causedByDrain, context);
+ @Nullable Context context,
+ ValueKind valueKind) {
+ super(
+ value, paneInfo, currentRecordId, currentRecordOffset,
causedByDrain, context, valueKind);
}
@Override
@@ -628,7 +704,8 @@ public class WindowedValues {
getRecordId(),
getRecordOffset(),
causedByDrain(),
- getOpenTelemetryContext());
+ getOpenTelemetryContext(),
+ getValueKind());
}
@Override
@@ -668,8 +745,10 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context context) {
- super(value, paneInfo, currentRecordId, currentRecordOffset,
causedByDrain, context);
+ @Nullable Context context,
+ ValueKind valueKind) {
+ super(
+ value, paneInfo, currentRecordId, currentRecordOffset,
causedByDrain, context, valueKind);
this.timestamp = checkNotNull(timestamp);
}
@@ -693,9 +772,17 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context context) {
+ @Nullable Context context,
+ ValueKind valueKind) {
super(
- value, timestamp, paneInfo, currentRecordId, currentRecordOffset,
causedByDrain, context);
+ value,
+ timestamp,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ context,
+ valueKind);
}
@Override
@@ -717,7 +804,8 @@ public class WindowedValues {
getRecordId(),
getRecordOffset(),
causedByDrain(),
- getOpenTelemetryContext());
+ getOpenTelemetryContext(),
+ getValueKind());
}
@Override
@@ -769,7 +857,8 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context openTelemetryContext) {
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
super(
value,
timestamp,
@@ -777,7 +866,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
this.window = checkNotNull(window);
}
@@ -791,7 +881,8 @@ public class WindowedValues {
getRecordId(),
getRecordOffset(),
causedByDrain(),
- getOpenTelemetryContext());
+ getOpenTelemetryContext(),
+ getValueKind());
}
@Override
@@ -850,7 +941,8 @@ public class WindowedValues {
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
- @Nullable Context openTelemetryContext) {
+ @Nullable Context openTelemetryContext,
+ ValueKind valueKind) {
super(
value,
timestamp,
@@ -858,7 +950,8 @@ public class WindowedValues {
currentRecordId,
currentRecordOffset,
causedByDrain,
- openTelemetryContext);
+ openTelemetryContext,
+ valueKind);
this.windows = checkNotNull(windows);
}
@@ -877,7 +970,8 @@ public class WindowedValues {
getRecordId(),
getRecordOffset(),
causedByDrain(),
- getOpenTelemetryContext());
+ getOpenTelemetryContext(),
+ getValueKind());
}
@Override
@@ -1047,6 +1141,7 @@ public class WindowedValues {
windowedElem.causedByDrain() ==
CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
+
.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()))
.build();
ByteArrayCoder.of().encode(em.toByteArray(), outStream);
@@ -1067,6 +1162,7 @@ public class WindowedValues {
PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
io.opentelemetry.context.Context openTelemetryContext = null;
+ ValueKind valueKind = ValueKind.INSERT;
if (isMetadataSupported() && paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -1075,13 +1171,14 @@ public class WindowedValues {
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
openTelemetryContext =
OpenTelemetryContextPropagator.read(elementMetadata);
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
T value = valueCoder.decode(inStream, context);
// Because there are some remaining (incorrect) uses of WindowedValue
with no windows,
// we call this deprecated no-validation path when decoding
return WindowedValues.createWithoutValidation(
- value, timestamp, windows, paneInfo, causedByDrain,
openTelemetryContext);
+ value, timestamp, windows, paneInfo, causedByDrain,
openTelemetryContext, valueKind);
}
@Override
@@ -1211,7 +1308,18 @@ public class WindowedValues {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
- return new ParamWindowedValueCoder<>(valueCoder, windowCoder, timestamp,
windows, paneInfo);
+ return of(valueCoder, windowCoder, timestamp, windows, paneInfo,
ValueKind.INSERT);
+ }
+
+ public static <T> ParamWindowedValueCoder<T> of(
+ Coder<T> valueCoder,
+ Coder<? extends BoundedWindow> windowCoder,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo,
+ ValueKind valueKind) {
+ return new ParamWindowedValueCoder<>(
+ valueCoder, windowCoder, timestamp, windows, paneInfo, valueKind);
}
/**
@@ -1226,7 +1334,8 @@ public class WindowedValues {
windowCoder,
BoundedWindow.TIMESTAMP_MIN_VALUE,
GLOBAL_WINDOWS,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ ValueKind.INSERT);
}
/**
@@ -1244,15 +1353,31 @@ public class WindowedValues {
Coder<? extends BoundedWindow> windowCoder,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
+ PaneInfo paneInfo,
+ ValueKind valueKind) {
super(valueCoder, windowCoder);
- this.windowedValuePrototype = WindowedValues.of(EMPTY_BYTES, timestamp,
windows, paneInfo);
+ this.windowedValuePrototype =
+ WindowedValues.of(
+ EMPTY_BYTES,
+ timestamp,
+ windows,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ valueKind);
}
@Override
public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT>
valueCoder) {
return new ParamWindowedValueCoder<>(
- valueCoder, getWindowCoder(), getTimestamp(), getWindows(),
getPaneInfo());
+ valueCoder,
+ getWindowCoder(),
+ getTimestamp(),
+ getWindows(),
+ getPaneInfo(),
+ getValueKind());
}
@Override
@@ -1290,6 +1415,10 @@ public class WindowedValues {
valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
+ public ValueKind getValueKind() {
+ return windowedValuePrototype.getValueKind();
+ }
+
public Instant getTimestamp() {
return windowedValuePrototype.getTimestamp();
}
@@ -1309,7 +1438,15 @@ public class WindowedValues {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WindowedValue<byte[]> windowedValue =
WindowedValues.of(
- EMPTY_BYTES, from.getTimestamp(), from.getWindows(),
from.getPaneInfo());
+ EMPTY_BYTES,
+ from.getTimestamp(),
+ from.getWindows(),
+ from.getPaneInfo(),
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ from.getValueKind());
WindowedValues.FullWindowedValueCoder<byte[]> windowedValueCoder =
WindowedValues.FullWindowedValueCoder.of(ByteArrayCoder.of(),
from.getWindowCoder());
try {
@@ -1337,7 +1474,8 @@ public class WindowedValues {
windowCoder,
windowedValue.getTimestamp(),
windowedValue.getWindows(),
- windowedValue.getPaneInfo());
+ windowedValue.getPaneInfo(),
+ windowedValue.getValueKind());
} catch (IOException e) {
throw new RuntimeException(
"Unable to decode constant members from payload for
ParamWindowedValueCoder: ", e);
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 830ad654b45..3689b1be7db 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -39,6 +39,7 @@ import
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -110,7 +111,8 @@ public class WindowedValueTest {
null,
null,
CausedByDrain.CAUSED_BY_DRAIN,
- context); // drain is persisted as part of metadata
+ context,
+ ValueKind.DELETE); // drain is persisted as part of metadata
Coder<WindowedValue<String>> windowedValueCoder =
WindowedValues.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
@@ -125,6 +127,7 @@ public class WindowedValueTest {
Assert.assertArrayEquals(value.getWindows().toArray(),
decodedValue.getWindows().toArray());
Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain());
Assert.assertNotNull(value.getOpenTelemetryContext());
+ Assert.assertEquals(ValueKind.DELETE, value.getValueKind());
}
@Test
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 68fdcdbf0a9..0fbc92c1f7d 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2365,7 +2365,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
null,
null,
currentTimer.causedByDrain(),
- null));
+ null,
+ currentElement.getValueKind()));
}
@Override