This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 399d9d7ea6b Introduce ValueKind to Java and add to WindowedValue 
(#38315)
399d9d7ea6b is described below

commit 399d9d7ea6b510195741049523cda19eb94c1569
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon May 11 10:42:09 2026 -0400

    Introduce ValueKind to Java and add to WindowedValue (#38315)
    
    * introduce ValueKind to Java and add to WindowedValue
    
    * map UNSPECIFIED to INSERT; cleanup
    
    * rebase remnants
    
    * adjust equals and hashcode methods to include valuekind
    
    * use existing coder test
    
    * compile fixes
    
    * compile fixes in dataflow
    
    * compile fixes in dataflow
    
    * add test to WindmillKeyedWorkItem
    
    * compile fixes
---
 .../runners/core/LateDataDroppingDoFnRunner.java   |   3 +-
 ...TimeBoundedSplittableProcessElementInvoker.java |   6 +-
 .../core/SplittableParDoViaKeyedWorkItems.java     |   3 +-
 .../beam/runners/dataflow/BatchViewOverrides.java  |   6 +
 .../dataflow/worker/UngroupedWindmillReader.java   |  10 +-
 .../dataflow/worker/WindmillKeyedWorkItem.java     |  14 +-
 .../dataflow/worker/util/ValueInEmptyWindows.java  |   6 +
 .../dataflow/worker/WindmillKeyedWorkItemTest.java |  43 ++++
 .../apache/beam/runners/spark/util/TimerUtils.java |   6 +
 .../org/apache/beam/sdk/values/OutputBuilder.java  |   2 +
 .../beam/sdk/values/ValueInSingleWindow.java       |  52 ++++-
 .../java/org/apache/beam/sdk/values/ValueKind.java |  40 ++++
 .../org/apache/beam/sdk/values/ValueKindUtil.java  |  54 +++++
 .../org/apache/beam/sdk/values/WindowedValue.java  |   4 +
 .../org/apache/beam/sdk/values/WindowedValues.java | 230 ++++++++++++++++-----
 .../apache/beam/sdk/util/WindowedValueTest.java    |   5 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |   3 +-
 17 files changed, 428 insertions(+), 59 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 2ce94533d9e..41052a76f13 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -152,7 +152,8 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, 
W extends BoundedWin
                     element.getRecordId(),
                     element.getRecordOffset(),
                     element.causedByDrain(),
-                    element.getOpenTelemetryContext()));
+                    element.getOpenTelemetryContext(),
+                    element.getValueKind()));
           }
         }
       }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index dc84b15c89f..ebd88442b21 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -452,7 +452,8 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
               element.getRecordId(),
               element.getRecordOffset(),
               element.causedByDrain(),
-              element.getOpenTelemetryContext()));
+              element.getOpenTelemetryContext(),
+              element.getValueKind()));
     }
 
     @Override
@@ -476,7 +477,8 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
               element.getRecordId(),
               element.getRecordOffset(),
               element.causedByDrain(),
-              element.getOpenTelemetryContext()));
+              element.getOpenTelemetryContext(),
+              element.getValueKind()));
     }
 
     @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 7a0f984479a..424ea567115 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -473,7 +473,8 @@ public class SplittableParDoViaKeyedWorkItems {
                   read.getRecordId(),
                   read.getRecordOffset(),
                   CausedByDrain.CAUSED_BY_DRAIN,
-                  read.getOpenTelemetryContext());
+                  read.getOpenTelemetryContext(),
+                  read.getValueKind());
         }
         elementAndRestriction = KV.of(read, restrictionState.read());
         watermarkEstimatorStateT = watermarkEstimatorState.read();
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index e5ac4e99285..b15b3f3834d 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -74,6 +74,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.ValueKind;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -1415,6 +1416,11 @@ class BatchViewOverrides {
       return null;
     }
 
+    @Override
+    public ValueKind getValueKind() {
+      return ValueKind.INSERT;
+    }
+
     @Override
     public Iterable<WindowedValue<T>> explodeWindows() {
       return Collections.emptyList();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index 8ef0bf80323..0b883c04846 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueKind;
+import org.apache.beam.sdk.values.ValueKindUtil;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -139,6 +141,7 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
        * drain happened upstream
        */
       CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+      ValueKind valueKind = ValueKind.INSERT;
       if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
         BeamFnApi.Elements.ElementMetadata elementMetadata =
             WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
@@ -146,6 +149,7 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
             elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING
                 ? CausedByDrain.CAUSED_BY_DRAIN
                 : CausedByDrain.NORMAL;
+        valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
       }
       if (valueCoder instanceof KvCoder) {
         KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -164,7 +168,8 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
             null,
             null,
             drainingValueFromUpstream,
-            null);
+            null,
+            valueKind);
       } else {
         notifyElementRead(data.available() + metadata.available());
         // todo #37030 parse context from previous stage
@@ -176,7 +181,8 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
             null,
             null,
             drainingValueFromUpstream,
-            null);
+            null,
+            valueKind);
       }
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 4c968030eef..033e4186158 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -42,6 +42,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
+import org.apache.beam.sdk.values.ValueKindUtil;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
@@ -148,6 +150,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
        * drain happened upstream
        */
       CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+      ValueKind valueKind = ValueKind.INSERT;
       if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
         BeamFnApi.Elements.ElementMetadata elementMetadata =
             WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
@@ -155,11 +158,20 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
             elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING
                 ? CausedByDrain.CAUSED_BY_DRAIN
                 : CausedByDrain.NORMAL;
+        valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
       }
       InputStream inputStream = message.getData().newInput();
       ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
       return WindowedValues.of(
-          value, timestamp, windows, paneInfo, null, null, 
drainingValueFromUpstream, null);
+          value,
+          timestamp,
+          windows,
+          paneInfo,
+          null,
+          null,
+          drainingValueFromUpstream,
+          null,
+          valueKind);
     } catch (RuntimeException | IOException e) {
       if (!skipUndecodableElements) {
         throw new RuntimeException(e);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
index 67ffdc9f3b8..d5a493577f4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
 import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -71,6 +72,11 @@ public class ValueInEmptyWindows<T> implements 
WindowedValue<T> {
     return null;
   }
 
+  @Override
+  public ValueKind getValueKind() {
+    return ValueKind.INSERT;
+  }
+
   @Override
   public Iterable<WindowedValue<T>> explodeWindows() {
     return Collections.emptyList();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index c1568058435..e6738246b53 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -46,6 +46,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -374,6 +375,48 @@ public class WindmillKeyedWorkItemTest {
     assertThat(
         keyedWorkItem.timersIterable(),
         Matchers.contains(makeDrainingTimer(STATE_NAMESPACE_2, 3, 
TimeDomain.EVENT_TIME)));
+    WindowedValues.WindowedValueCoder.setMetadataNotSupported();
+  }
+
+  @Test
+  public void testValueKindPropagated() throws Exception {
+    WindowedValues.WindowedValueCoder.setMetadataSupported();
+    Windmill.WorkItem.Builder workItem =
+        Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+    Windmill.InputMessageBundle.Builder chunk1 = 
workItem.addMessageBundlesBuilder();
+    chunk1.setSourceComputationId("computation");
+    addElementWithMetadata(
+        chunk1,
+        5,
+        "hello",
+        WINDOW_1,
+        paneInfo(0),
+        BeamFnApi.Elements.ElementMetadata.newBuilder()
+            .setValueKind(BeamFnApi.Elements.ValueKind.Enum.UPDATE_AFTER)
+            .build());
+    addElementWithMetadata(
+        chunk1,
+        7,
+        "world",
+        WINDOW_2,
+        paneInfo(2),
+        BeamFnApi.Elements.ElementMetadata.newBuilder()
+            .setValueKind(BeamFnApi.Elements.ValueKind.Enum.DELETE)
+            .build());
+    KeyedWorkItem<String, String> keyedWorkItem =
+        new WindmillKeyedWorkItem<>(
+            KEY,
+            workItem.build(),
+            WINDOW_CODER,
+            WINDOWS_CODER,
+            VALUE_CODER,
+            windmillTagEncoding,
+            true);
+
+    Iterator<WindowedValue<String>> iterator = 
keyedWorkItem.elementsIterable().iterator();
+    Assert.assertEquals(ValueKind.UPDATE_AFTER, 
iterator.next().getValueKind());
+    Assert.assertEquals(ValueKind.DELETE, iterator.next().getValueKind());
+    WindowedValues.WindowedValueCoder.setMetadataNotSupported();
   }
 
   private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
index b6dda82239c..2111867d385 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueKind;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
@@ -127,6 +128,11 @@ public class TimerUtils {
       return CausedByDrain.NORMAL;
     }
 
+    @Override
+    public ValueKind getValueKind() {
+      return ValueKind.INSERT;
+    }
+
     @Override
     public @Nullable Long getRecordOffset() {
       return null;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
index ea73a5c35d7..c0894240572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
@@ -53,5 +53,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {
 
   OutputBuilder<T> setOpenTelemetryContext(@Nullable Context 
openTelemetryContext);
 
+  OutputBuilder<T> setValueKind(ValueKind valueKind);
+
   void output();
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index afd5aa84520..ef7b40aabe4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -71,6 +71,8 @@ public abstract class ValueInSingleWindow<T> {
 
   public abstract @Nullable Context getOpenTelemetryContext();
 
+  public abstract ValueKind getValueKind();
+
   // todo #33176 specify additional metadata in the future
   public static <T> ValueInSingleWindow<T> of(
       T value,
@@ -81,6 +83,28 @@ public abstract class ValueInSingleWindow<T> {
       @Nullable Long currentRecordOffset,
       CausedByDrain causedByDrain,
       @Nullable Context openTelemetryContext) {
+    return of(
+        value,
+        timestamp,
+        window,
+        paneInfo,
+        currentRecordId,
+        currentRecordOffset,
+        causedByDrain,
+        openTelemetryContext,
+        ValueKind.INSERT);
+  }
+
+  public static <T> ValueInSingleWindow<T> of(
+      T value,
+      Instant timestamp,
+      BoundedWindow window,
+      PaneInfo paneInfo,
+      @Nullable String currentRecordId,
+      @Nullable Long currentRecordOffset,
+      CausedByDrain causedByDrain,
+      @Nullable Context openTelemetryContext,
+      ValueKind valueKind) {
     return new AutoValue_ValueInSingleWindow<>(
         value,
         timestamp,
@@ -89,12 +113,22 @@ public abstract class ValueInSingleWindow<T> {
         currentRecordId,
         currentRecordOffset,
         causedByDrain,
-        openTelemetryContext);
+        openTelemetryContext,
+        valueKind);
   }
 
   public static <T> ValueInSingleWindow<T> of(
       T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
-    return of(value, timestamp, window, paneInfo, null, null, 
CausedByDrain.NORMAL, null);
+    return of(
+        value,
+        timestamp,
+        window,
+        paneInfo,
+        null,
+        null,
+        CausedByDrain.NORMAL,
+        null,
+        ValueKind.INSERT);
   }
 
   /** A coder for {@link ValueInSingleWindow}. */
@@ -144,9 +178,9 @@ public abstract class ValueInSingleWindow<T> {
         io.opentelemetry.context.Context openTelemetryContext =
             windowedElem.getOpenTelemetryContext();
         if (openTelemetryContext != null) {
-
           OpenTelemetryContextPropagator.set(openTelemetryContext, builder);
         }
+        
builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()));
         BeamFnApi.Elements.ElementMetadata metadata = builder.build();
         ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
       }
@@ -168,6 +202,7 @@ public abstract class ValueInSingleWindow<T> {
       PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
       CausedByDrain causedByDrain = CausedByDrain.NORMAL;
       io.opentelemetry.context.@Nullable Context openTelemetryContext = null;
+      ValueKind valueKind = ValueKind.INSERT;
       if (WindowedValues.WindowedValueCoder.isMetadataSupported() && 
paneInfo.isElementMetadata()) {
         BeamFnApi.Elements.ElementMetadata elementMetadata =
             
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -176,12 +211,21 @@ public abstract class ValueInSingleWindow<T> {
                 ? CausedByDrain.CAUSED_BY_DRAIN
                 : CausedByDrain.NORMAL;
         openTelemetryContext = 
OpenTelemetryContextPropagator.read(elementMetadata);
+        valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
       }
 
       T value = valueCoder.decode(inStream, context);
       // todo #33176 specify additional metadata in the future
       return new AutoValue_ValueInSingleWindow<>(
-          value, timestamp, window, paneInfo, null, null, causedByDrain, 
openTelemetryContext);
+          value,
+          timestamp,
+          window,
+          paneInfo,
+          null,
+          null,
+          causedByDrain,
+          openTelemetryContext,
+          valueKind);
     }
 
     @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
new file mode 100644
index 00000000000..7a190496ca2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+/** The type of change operation represented by a Change Data Capture (CDC) 
record. */
+public enum ValueKind {
+  /** Indicates a new record was created in the source system. */
+  INSERT,
+
+  /**
+   * Indicates the state of a record immediately <b>before</b> an update 
occurred. This is typically
+   * used to identify the previous values of modified columns or to locate the 
record via its
+   * primary key.
+   */
+  UPDATE_BEFORE,
+
+  /**
+   * Indicates the state of a record immediately <b>after</b> an update 
occurred. Represents the
+   * current, valid state of the record following the change.
+   */
+  UPDATE_AFTER,
+
+  /** Indicates that an existing record was removed from the source system. */
+  DELETE
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
new file mode 100644
index 00000000000..46ceb8f6247
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+
+/** Utility class for converting between {@link ValueKind} and {@link 
Elements.ValueKind.Enum}. */
+public final class ValueKindUtil {
+  public static Elements.ValueKind.Enum toProto(ValueKind valueKind) {
+    switch (valueKind) {
+      case INSERT:
+        return Elements.ValueKind.Enum.INSERT;
+      case UPDATE_BEFORE:
+        return Elements.ValueKind.Enum.UPDATE_BEFORE;
+      case UPDATE_AFTER:
+        return Elements.ValueKind.Enum.UPDATE_AFTER;
+      case DELETE:
+        return Elements.ValueKind.Enum.DELETE;
+      default:
+        throw new IllegalArgumentException("Unknown ValueKind: " + valueKind);
+    }
+  }
+
+  public static ValueKind fromProto(Elements.ValueKind.Enum proto) {
+    switch (proto) {
+      case VALUE_KIND_UNSPECIFIED:
+      case INSERT:
+        return ValueKind.INSERT;
+      case UPDATE_BEFORE:
+        return ValueKind.UPDATE_BEFORE;
+      case UPDATE_AFTER:
+        return ValueKind.UPDATE_AFTER;
+      case DELETE:
+        return ValueKind.DELETE;
+      default:
+        throw new IllegalArgumentException("Unknown ValueKind: " + proto);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
index 3bf51b26630..13796018d7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
@@ -58,6 +58,10 @@ public interface WindowedValue<T> {
 
   CausedByDrain causedByDrain();
 
+  /** Returns the {@link ValueKind} associated with this WindowedValue. */
+  @Pure
+  ValueKind getValueKind();
+
   /**
    * A representation of each of the actual values represented by this 
compressed {@link
    * WindowedValue}, one per window.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
index 58595875fd3..33700a9dc0d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
@@ -83,7 +83,8 @@ public class WindowedValues {
         .setRecordOffset(template.getRecordOffset())
         .setRecordId(template.getRecordId())
         .setCausedByDrain(template.causedByDrain())
-        .setOpenTelemetryContext(template.getOpenTelemetryContext());
+        .setOpenTelemetryContext(template.getOpenTelemetryContext())
+        .setValueKind(template.getValueKind());
   }
 
   public static class Builder<T> implements OutputBuilder<T> {
@@ -107,6 +108,7 @@ public class WindowedValues {
     private @Nullable Long recordOffset;
     private CausedByDrain causedByDrain = CausedByDrain.NORMAL;
     private @Nullable Context openTelemetryContext;
+    private ValueKind valueKind = ValueKind.INSERT;
 
     @Override
     public Builder<T> setValue(T value) {
@@ -163,6 +165,13 @@ public class WindowedValues {
       return this;
     }
 
+    @Override
+    public Builder<T> setValueKind(ValueKind valueKind) {
+      checkStateNotNull(valueKind, "ValueKind is null");
+      this.valueKind = valueKind;
+      return this;
+    }
+
     public Builder<T> setReceiver(WindowedValueReceiver<T> receiver) {
       this.receiver = receiver;
       return this;
@@ -222,6 +231,12 @@ public class WindowedValues {
       return causedByDrain;
     }
 
+    @Override
+    public ValueKind getValueKind() {
+      checkStateNotNull(valueKind, "ValueKind not set");
+      return valueKind;
+    }
+
     @Override
     public Collection<Builder<T>> explodeWindows() {
       throw new UnsupportedOperationException(
@@ -258,7 +273,8 @@ public class WindowedValues {
           getRecordId(),
           getRecordOffset(),
           causedByDrain(),
-          getOpenTelemetryContext());
+          getOpenTelemetryContext(),
+          getValueKind());
     }
 
     @Override
@@ -269,6 +285,7 @@ public class WindowedValues {
           .add("windows", getWindows())
           .add("paneInfo", getPaneInfo())
           .add("causedByDrain", causedByDrain())
+          .add("valueKind", getValueKind())
           .add("receiver", receiver)
           .toString();
     }
@@ -276,7 +293,16 @@ public class WindowedValues {
 
   public static <T> WindowedValue<T> of(
       T value, Instant timestamp, Collection<? extends BoundedWindow> windows, 
PaneInfo paneInfo) {
-    return of(value, timestamp, windows, paneInfo, null, null, 
CausedByDrain.NORMAL, null);
+    return of(
+        value,
+        timestamp,
+        windows,
+        paneInfo,
+        null,
+        null,
+        CausedByDrain.NORMAL,
+        null,
+        ValueKind.INSERT);
   }
 
   /** Returns a {@code WindowedValue} with the given value, timestamp, and 
windows. */
@@ -288,10 +314,12 @@ public class WindowedValues {
       @Nullable String currentRecordId,
       @Nullable Long currentRecordOffset,
       CausedByDrain causedByDrain,
-      @Nullable Context openTelemetryContext) {
+      @Nullable Context openTelemetryContext,
+      ValueKind valueKind) {
     checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it 
was null");
     checkArgument(windows.size() > 0, "WindowedValue requires windows, but 
there were none");
     checkArgument(causedByDrain != null, "WindowedValue requires 
CausedByDrain, but it was null");
+    checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it 
was null");
     if (windows.size() == 1) {
       return of(
           value,
@@ -301,7 +329,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     } else {
       return new TimestampedValueInMultipleWindows<>(
           value,
@@ -311,7 +340,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     }
   }
 
@@ -323,7 +353,8 @@ public class WindowedValues {
       Collection<? extends BoundedWindow> windows,
       PaneInfo paneInfo,
       CausedByDrain causedByDrain,
-      @Nullable Context openTelemetryContext) {
+      @Nullable Context openTelemetryContext,
+      ValueKind valueKind) {
     if (windows.size() == 1) {
       return of(
           value,
@@ -333,10 +364,19 @@ public class WindowedValues {
           null,
           null,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     } else {
       return new TimestampedValueInMultipleWindows<>(
-          value, timestamp, windows, paneInfo, null, null, causedByDrain, 
openTelemetryContext);
+          value,
+          timestamp,
+          windows,
+          paneInfo,
+          null,
+          null,
+          causedByDrain,
+          openTelemetryContext,
+          valueKind);
     }
   }
 
@@ -345,7 +385,16 @@ public class WindowedValues {
       T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
     checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it 
was null");
 
-    return of(value, timestamp, window, paneInfo, null, null, 
CausedByDrain.NORMAL, null);
+    return of(
+        value,
+        timestamp,
+        window,
+        paneInfo,
+        null,
+        null,
+        CausedByDrain.NORMAL,
+        null,
+        ValueKind.INSERT);
   }
 
   /** Returns a {@code WindowedValue} with the given value, timestamp, and 
window. */
@@ -357,9 +406,11 @@ public class WindowedValues {
       @Nullable String currentRecordId,
       @Nullable Long currentRecordOffset,
       CausedByDrain causedByDrain,
-      @Nullable Context openTelemetryContext) {
+      @Nullable Context openTelemetryContext,
+      ValueKind valueKind) {
     checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it 
was null");
     checkArgument(causedByDrain != null, "WindowedValue requires 
CausedByDrain, but it was null");
+    checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it 
was null");
 
     boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
     if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
@@ -369,7 +420,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     } else if (isGlobal) {
       return new TimestampedValueInGlobalWindow<>(
           value,
@@ -378,7 +430,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     } else {
       return new TimestampedValueInSingleWindow<>(
           value,
@@ -388,7 +441,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
     }
   }
 
@@ -398,7 +452,7 @@ public class WindowedValues {
    */
   public static <T> WindowedValue<T> valueInGlobalWindow(T value) {
     return new ValueInGlobalWindow<>(
-        value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, null);
+        value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, null, 
ValueKind.INSERT);
   }
 
   /**
@@ -406,7 +460,8 @@ public class WindowedValues {
    * default timestamp and the specified pane.
    */
   public static <T> WindowedValue<T> valueInGlobalWindow(T value, PaneInfo 
paneInfo) {
-    return new ValueInGlobalWindow<>(value, paneInfo, null, null, 
CausedByDrain.NORMAL, null);
+    return new ValueInGlobalWindow<>(
+        value, paneInfo, null, null, CausedByDrain.NORMAL, null, 
ValueKind.INSERT);
   }
 
   /**
@@ -418,7 +473,14 @@ public class WindowedValues {
       return valueInGlobalWindow(value);
     } else {
       return new TimestampedValueInGlobalWindow<>(
-          value, timestamp, PaneInfo.NO_FIRING, null, null, 
CausedByDrain.NORMAL, null);
+          value,
+          timestamp,
+          PaneInfo.NO_FIRING,
+          null,
+          null,
+          CausedByDrain.NORMAL,
+          null,
+          ValueKind.INSERT);
     }
   }
 
@@ -432,7 +494,7 @@ public class WindowedValues {
       return timestampedValueInGlobalWindow(value, timestamp);
     } else {
       return new TimestampedValueInGlobalWindow<>(
-          value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, null);
+          value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, null, 
ValueKind.INSERT);
     }
   }
 
@@ -450,7 +512,8 @@ public class WindowedValues {
         windowedValue.getRecordId(),
         windowedValue.getRecordOffset(),
         windowedValue.causedByDrain(),
-        windowedValue.getOpenTelemetryContext());
+        windowedValue.getOpenTelemetryContext(),
+        windowedValue.getValueKind());
   }
 
   public static <T> boolean equals(
@@ -469,7 +532,8 @@ public class WindowedValues {
     return left.getTimestamp().isEqual(right.getTimestamp())
         && Objects.equals(left.getValue(), right.getValue())
         && Iterables.elementsEqual(left.getWindows(), right.getWindows())
-        && Objects.equals(left.getPaneInfo(), right.getPaneInfo());
+        && Objects.equals(left.getPaneInfo(), right.getPaneInfo())
+        && Objects.equals(left.getValueKind(), right.getValueKind());
   }
 
   public static <T> int hashCode(WindowedValue<T> windowedValue) {
@@ -478,7 +542,8 @@ public class WindowedValues {
         windowedValue.getValue(),
         windowedValue.getTimestamp().getMillis(),
         windowedValue.getWindows(),
-        windowedValue.getPaneInfo());
+        windowedValue.getPaneInfo(),
+        windowedValue.getValueKind());
   }
 
   private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS =
@@ -503,6 +568,7 @@ public class WindowedValues {
     private final @Nullable Long currentRecordOffset;
     private final CausedByDrain causedByDrain;
     private final @Nullable Context context;
+    private final ValueKind valueKind;
 
     @Override
     public @Nullable String getRecordId() {
@@ -519,19 +585,26 @@ public class WindowedValues {
       return causedByDrain;
     }
 
+    @Override
+    public ValueKind getValueKind() {
+      return valueKind;
+    }
+
     protected SimpleWindowedValue(
         T value,
         PaneInfo paneInfo,
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context context) {
+        @Nullable Context context,
+        ValueKind valueKind) {
       this.value = value;
       this.paneInfo = checkNotNull(paneInfo);
       this.currentRecordId = currentRecordId;
       this.currentRecordOffset = currentRecordOffset;
       this.causedByDrain = causedByDrain;
       this.context = context;
+      this.valueKind = checkNotNull(valueKind, "ValueKind is null");
     }
 
     @Override
@@ -586,8 +659,9 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context context) {
-      super(value, pane, currentRecordId, currentRecordOffset, causedByDrain, 
context);
+        @Nullable Context context,
+        ValueKind valueKind) {
+      super(value, pane, currentRecordId, currentRecordOffset, causedByDrain, 
context, valueKind);
     }
 
     @Override
@@ -606,8 +680,10 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context context) {
-      super(value, paneInfo, currentRecordId, currentRecordOffset, 
causedByDrain, context);
+        @Nullable Context context,
+        ValueKind valueKind) {
+      super(
+          value, paneInfo, currentRecordId, currentRecordOffset, 
causedByDrain, context, valueKind);
     }
 
     @Override
@@ -628,7 +704,8 @@ public class WindowedValues {
           getRecordId(),
           getRecordOffset(),
           causedByDrain(),
-          getOpenTelemetryContext());
+          getOpenTelemetryContext(),
+          getValueKind());
     }
 
     @Override
@@ -668,8 +745,10 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context context) {
-      super(value, paneInfo, currentRecordId, currentRecordOffset, 
causedByDrain, context);
+        @Nullable Context context,
+        ValueKind valueKind) {
+      super(
+          value, paneInfo, currentRecordId, currentRecordOffset, 
causedByDrain, context, valueKind);
       this.timestamp = checkNotNull(timestamp);
     }
 
@@ -693,9 +772,17 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context context) {
+        @Nullable Context context,
+        ValueKind valueKind) {
       super(
-          value, timestamp, paneInfo, currentRecordId, currentRecordOffset, 
causedByDrain, context);
+          value,
+          timestamp,
+          paneInfo,
+          currentRecordId,
+          currentRecordOffset,
+          causedByDrain,
+          context,
+          valueKind);
     }
 
     @Override
@@ -717,7 +804,8 @@ public class WindowedValues {
           getRecordId(),
           getRecordOffset(),
           causedByDrain(),
-          getOpenTelemetryContext());
+          getOpenTelemetryContext(),
+          getValueKind());
     }
 
     @Override
@@ -769,7 +857,8 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context openTelemetryContext) {
+        @Nullable Context openTelemetryContext,
+        ValueKind valueKind) {
       super(
           value,
           timestamp,
@@ -777,7 +866,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
       this.window = checkNotNull(window);
     }
 
@@ -791,7 +881,8 @@ public class WindowedValues {
           getRecordId(),
           getRecordOffset(),
           causedByDrain(),
-          getOpenTelemetryContext());
+          getOpenTelemetryContext(),
+          getValueKind());
     }
 
     @Override
@@ -850,7 +941,8 @@ public class WindowedValues {
         @Nullable String currentRecordId,
         @Nullable Long currentRecordOffset,
         CausedByDrain causedByDrain,
-        @Nullable Context openTelemetryContext) {
+        @Nullable Context openTelemetryContext,
+        ValueKind valueKind) {
       super(
           value,
           timestamp,
@@ -858,7 +950,8 @@ public class WindowedValues {
           currentRecordId,
           currentRecordOffset,
           causedByDrain,
-          openTelemetryContext);
+          openTelemetryContext,
+          valueKind);
       this.windows = checkNotNull(windows);
     }
 
@@ -877,7 +970,8 @@ public class WindowedValues {
           getRecordId(),
           getRecordOffset(),
           causedByDrain(),
-          getOpenTelemetryContext());
+          getOpenTelemetryContext(),
+          getValueKind());
     }
 
     @Override
@@ -1047,6 +1141,7 @@ public class WindowedValues {
                     windowedElem.causedByDrain() == 
CausedByDrain.CAUSED_BY_DRAIN
                         ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
                         : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
+                
.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()))
                 .build();
 
         ByteArrayCoder.of().encode(em.toByteArray(), outStream);
@@ -1067,6 +1162,7 @@ public class WindowedValues {
       PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
       CausedByDrain causedByDrain = CausedByDrain.NORMAL;
       io.opentelemetry.context.Context openTelemetryContext = null;
+      ValueKind valueKind = ValueKind.INSERT;
       if (isMetadataSupported() && paneInfo.isElementMetadata()) {
         BeamFnApi.Elements.ElementMetadata elementMetadata =
             
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -1075,13 +1171,14 @@ public class WindowedValues {
                 ? CausedByDrain.CAUSED_BY_DRAIN
                 : CausedByDrain.NORMAL;
         openTelemetryContext = 
OpenTelemetryContextPropagator.read(elementMetadata);
+        valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
       }
       T value = valueCoder.decode(inStream, context);
 
       // Because there are some remaining (incorrect) uses of WindowedValue 
with no windows,
       // we call this deprecated no-validation path when decoding
       return WindowedValues.createWithoutValidation(
-          value, timestamp, windows, paneInfo, causedByDrain, 
openTelemetryContext);
+          value, timestamp, windows, paneInfo, causedByDrain, 
openTelemetryContext, valueKind);
     }
 
     @Override
@@ -1211,7 +1308,18 @@ public class WindowedValues {
         Instant timestamp,
         Collection<? extends BoundedWindow> windows,
         PaneInfo paneInfo) {
-      return new ParamWindowedValueCoder<>(valueCoder, windowCoder, timestamp, 
windows, paneInfo);
+      return of(valueCoder, windowCoder, timestamp, windows, paneInfo, 
ValueKind.INSERT);
+    }
+
+    public static <T> ParamWindowedValueCoder<T> of(
+        Coder<T> valueCoder,
+        Coder<? extends BoundedWindow> windowCoder,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo,
+        ValueKind valueKind) {
+      return new ParamWindowedValueCoder<>(
+          valueCoder, windowCoder, timestamp, windows, paneInfo, valueKind);
     }
 
     /**
@@ -1226,7 +1334,8 @@ public class WindowedValues {
           windowCoder,
           BoundedWindow.TIMESTAMP_MIN_VALUE,
           GLOBAL_WINDOWS,
-          PaneInfo.NO_FIRING);
+          PaneInfo.NO_FIRING,
+          ValueKind.INSERT);
     }
 
     /**
@@ -1244,15 +1353,31 @@ public class WindowedValues {
         Coder<? extends BoundedWindow> windowCoder,
         Instant timestamp,
         Collection<? extends BoundedWindow> windows,
-        PaneInfo paneInfo) {
+        PaneInfo paneInfo,
+        ValueKind valueKind) {
       super(valueCoder, windowCoder);
-      this.windowedValuePrototype = WindowedValues.of(EMPTY_BYTES, timestamp, 
windows, paneInfo);
+      this.windowedValuePrototype =
+          WindowedValues.of(
+              EMPTY_BYTES,
+              timestamp,
+              windows,
+              paneInfo,
+              null,
+              null,
+              CausedByDrain.NORMAL,
+              null,
+              valueKind);
     }
 
     @Override
     public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> 
valueCoder) {
       return new ParamWindowedValueCoder<>(
-          valueCoder, getWindowCoder(), getTimestamp(), getWindows(), 
getPaneInfo());
+          valueCoder,
+          getWindowCoder(),
+          getTimestamp(),
+          getWindows(),
+          getPaneInfo(),
+          getValueKind());
     }
 
     @Override
@@ -1290,6 +1415,10 @@ public class WindowedValues {
       valueCoder.registerByteSizeObserver(value.getValue(), observer);
     }
 
+    public ValueKind getValueKind() {
+      return windowedValuePrototype.getValueKind();
+    }
+
     public Instant getTimestamp() {
       return windowedValuePrototype.getTimestamp();
     }
@@ -1309,7 +1438,15 @@ public class WindowedValues {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       WindowedValue<byte[]> windowedValue =
           WindowedValues.of(
-              EMPTY_BYTES, from.getTimestamp(), from.getWindows(), 
from.getPaneInfo());
+              EMPTY_BYTES,
+              from.getTimestamp(),
+              from.getWindows(),
+              from.getPaneInfo(),
+              null,
+              null,
+              CausedByDrain.NORMAL,
+              null,
+              from.getValueKind());
       WindowedValues.FullWindowedValueCoder<byte[]> windowedValueCoder =
           WindowedValues.FullWindowedValueCoder.of(ByteArrayCoder.of(), 
from.getWindowCoder());
       try {
@@ -1337,7 +1474,8 @@ public class WindowedValues {
             windowCoder,
             windowedValue.getTimestamp(),
             windowedValue.getWindows(),
-            windowedValue.getPaneInfo());
+            windowedValue.getPaneInfo(),
+            windowedValue.getValueKind());
       } catch (IOException e) {
         throw new RuntimeException(
             "Unable to decode constant members from payload for 
ParamWindowedValueCoder: ", e);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 830ad654b45..3689b1be7db 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -39,6 +39,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.ValueKind;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -110,7 +111,8 @@ public class WindowedValueTest {
             null,
             null,
             CausedByDrain.CAUSED_BY_DRAIN,
-            context); // drain is persisted as part of metadata
+            context,
+            ValueKind.DELETE); // drain is persisted as part of metadata
 
     Coder<WindowedValue<String>> windowedValueCoder =
         WindowedValues.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
@@ -125,6 +127,7 @@ public class WindowedValueTest {
     Assert.assertArrayEquals(value.getWindows().toArray(), 
decodedValue.getWindows().toArray());
     Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain());
     Assert.assertNotNull(value.getOpenTelemetryContext());
+    Assert.assertEquals(ValueKind.DELETE, value.getValueKind());
   }
 
   @Test
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 68fdcdbf0a9..0fbc92c1f7d 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2365,7 +2365,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                 null,
                 null,
                 currentTimer.causedByDrain(),
-                null));
+                null,
+                currentElement.getValueKind()));
       }
 
       @Override

Reply via email to