This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92c5c98a0e7 [FLINK-37598][table] Support list and map state in PTFs
92c5c98a0e7 is described below

commit 92c5c98a0e76e052a8fe27478557e772d7a2503c
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Apr 8 13:10:06 2025 +0200

    [FLINK-37598][table] Support list and map state in PTFs
    
    This closes #26396.
---
 docs/content.zh/docs/dev/table/functions/ptfs.md   |  79 ++++++++++
 docs/content/docs/dev/table/functions/ptfs.md      |  79 ++++++++++
 .../apache/flink/table/api/dataview/DataView.java  |   6 +-
 .../apache/flink/table/api/dataview/ListView.java  |  26 ++--
 .../apache/flink/table/api/dataview/MapView.java   |  32 ++--
 .../table/functions/ProcessTableFunction.java      |  50 +++++++
 .../stream/StreamExecProcessTableFunction.java     |   8 +-
 .../codegen/ProcessTableRunnerGenerator.scala      | 166 ++++++++++++++-------
 .../table/planner/plan/utils/AggregateUtil.scala   |   3 +-
 .../table/planner/plan/utils/aggregation.scala     |   2 +-
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |   2 +-
 .../stream/ProcessTableFunctionSemanticTests.java  |   4 +-
 .../stream/ProcessTableFunctionTestPrograms.java   |  36 +++++
 .../exec/stream/ProcessTableFunctionTestUtils.java |  89 ++++++++++-
 .../table/runtime/dataview}/DataViewUtils.java     |  53 ++++---
 .../table/runtime/dataview/StateListView.java      |  22 ++-
 .../flink/table/runtime/dataview/StateMapView.java |  24 ++-
 .../runtime/generated/ProcessTableRunner.java      |  59 +++++---
 .../operators/process/ProcessTableOperator.java    |  72 +++++++--
 .../operators/process/RuntimeStateInfo.java        |  12 +-
 20 files changed, 669 insertions(+), 155 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md 
b/docs/content.zh/docs/dev/table/functions/ptfs.md
index 6c613a115e8..835aeb03350 100644
--- a/docs/content.zh/docs/dev/table/functions/ptfs.md
+++ b/docs/content.zh/docs/dev/table/functions/ptfs.md
@@ -708,6 +708,85 @@ class CountingFunction extends 
ProcessTableFunction<String> {
 {{< /tab >}}
 {{< /tabs >}}
 
+### Large State
+
+Flink's state backends provide different types of state to efficiently handle 
large state.
+
+Currently, PTFs support three types of state:
+
+- **Value state**: Represents a single value.
+- **List state**: Represents a list of values, supporting operations like 
appending, removing, and iterating.
+- **Map state**: Represents a map (key-value pair) for efficient lookups, 
modifications, and removal of individual entries.
+
+By default, state entries in a PTF are represented as value state. This means 
that every state entry is fully read from
+the state backend when the evaluation method is called, and the value is 
written back to the state backend once the
+evaluation method finishes.
+
+To optimize state access and avoid unnecessary (de)serialization, state 
entries can be declared as:
+- `org.apache.flink.table.api.dataview.ListView` (for list state)
+- `org.apache.flink.table.api.dataview.MapView` (for map state)
+
+These provide direct views to the underlying Flink state backend.
+
+For example, when using a `MapView`, accessing a value via `MapView#get` will 
only deserialize the value associated with
+the specified key. This allows for efficient access to individual entries 
without needing to load the entire map. This
+approach is particularly useful when the map does not fit entirely into memory.
+
+{{< hint info >}}
+State TTL is applied individually to each entry in a list or map, allowing for 
fine-grained expiration control over state
+elements.
+{{< /hint >}}
+
+The following example demonstrates how to declare and use a `MapView`. It 
assumes the PTF processes a table with the
+schema `(userId, eventId, ...)`, partitioned by `userId`, with a high 
cardinality of distinct `eventId` values. For this
+use case, it is generally recommended to partition the table by both `userId` 
and `eventId`. For example purposes, the
+large state is stored as a map state.
+
+{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+// Function that uses a map view for storing a large map for an event history 
per user
+class LargeHistoryFunction extends ProcessTableFunction<String> {
+  public void eval(
+    @StateHint MapView<String, Integer> largeMemory,
+    @ArgumentHint(TABLE_AS_SET) Row input
+  ) {
+    String eventId = input.getFieldAs("eventId");
+    Integer count = largeMemory.get(eventId);
+    if (count == null) {
+      largeMemory.put(eventId, 1);
+    } else {
+      if (count > 1000) {
+        collect("Anomaly detected: " + eventId);
+      }
+      largeMemory.put(eventId, count + 1);
+    }
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Similar to other data types, reflection is used to extract the necessary type 
information. If reflection is not
+feasible - such as when a `Row` object is involved - type hints can be 
provided. Use the `ARRAY` data type for list views
+and the `MAP` data type for map views.
+
+{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+// Function that uses a list view of rows
+class LargeHistoryFunction extends ProcessTableFunction<String> {
+  public void eval(
+    @StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) 
ListView<Row> largeMemory,
+    @ArgumentHint(TABLE_AS_SET) Row input
+  ) {
+    ...
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Efficiency and Design Principles
 
 A stateful function also means that data layout and data retention should be 
well thought
diff --git a/docs/content/docs/dev/table/functions/ptfs.md 
b/docs/content/docs/dev/table/functions/ptfs.md
index 6c613a115e8..835aeb03350 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -708,6 +708,85 @@ class CountingFunction extends 
ProcessTableFunction<String> {
 {{< /tab >}}
 {{< /tabs >}}
 
+### Large State
+
+Flink's state backends provide different types of state to efficiently handle 
large state.
+
+Currently, PTFs support three types of state:
+
+- **Value state**: Represents a single value.
+- **List state**: Represents a list of values, supporting operations like 
appending, removing, and iterating.
+- **Map state**: Represents a map (key-value pair) for efficient lookups, 
modifications, and removal of individual entries.
+
+By default, state entries in a PTF are represented as value state. This means 
that every state entry is fully read from
+the state backend when the evaluation method is called, and the value is 
written back to the state backend once the
+evaluation method finishes.
+
+To optimize state access and avoid unnecessary (de)serialization, state 
entries can be declared as:
+- `org.apache.flink.table.api.dataview.ListView` (for list state)
+- `org.apache.flink.table.api.dataview.MapView` (for map state)
+
+These provide direct views to the underlying Flink state backend.
+
+For example, when using a `MapView`, accessing a value via `MapView#get` will 
only deserialize the value associated with
+the specified key. This allows for efficient access to individual entries 
without needing to load the entire map. This
+approach is particularly useful when the map does not fit entirely into memory.
+
+{{< hint info >}}
+State TTL is applied individually to each entry in a list or map, allowing for 
fine-grained expiration control over state
+elements.
+{{< /hint >}}
+
+The following example demonstrates how to declare and use a `MapView`. It 
assumes the PTF processes a table with the
+schema `(userId, eventId, ...)`, partitioned by `userId`, with a high 
cardinality of distinct `eventId` values. For this
+use case, it is generally recommended to partition the table by both `userId` 
and `eventId`. For example purposes, the
+large state is stored as a map state.
+
+{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+// Function that uses a map view for storing a large map for an event history 
per user
+class LargeHistoryFunction extends ProcessTableFunction<String> {
+  public void eval(
+    @StateHint MapView<String, Integer> largeMemory,
+    @ArgumentHint(TABLE_AS_SET) Row input
+  ) {
+    String eventId = input.getFieldAs("eventId");
+    Integer count = largeMemory.get(eventId);
+    if (count == null) {
+      largeMemory.put(eventId, 1);
+    } else {
+      if (count > 1000) {
+        collect("Anomaly detected: " + eventId);
+      }
+      largeMemory.put(eventId, count + 1);
+    }
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Similar to other data types, reflection is used to extract the necessary type 
information. If reflection is not
+feasible - such as when a `Row` object is involved - type hints can be 
provided. Use the `ARRAY` data type for list views
+and the `MAP` data type for map views.
+
+{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
+{{< tab "Java" >}}
+```java
+// Function that uses a list view of rows
+class LargeHistoryFunction extends ProcessTableFunction<String> {
+  public void eval(
+    @StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) 
ListView<Row> largeMemory,
+    @ArgumentHint(TABLE_AS_SET) Row input
+  ) {
+    ...
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Efficiency and Design Principles
 
 A stateful function also means that data layout and data retention should be 
well thought
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
index 9a49d335faa..a3873dcfbd9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
@@ -19,11 +19,11 @@
 package org.apache.flink.table.api.dataview;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.functions.ImperativeAggregateFunction;
+import org.apache.flink.table.functions.ProcessTableFunction;
 
 /**
- * A {@link DataView} is a collection type that can be used in the accumulator 
of an {@link
- * ImperativeAggregateFunction}.
+ * A {@link DataView} is a collection type that can be used in the accumulator 
of aggregating
+ * functions and as a state entry in {@link ProcessTableFunction}s.
  *
  * <p>Depending on the context in which the function is used, a {@link 
DataView} can be backed by a
  * Java heap collection or a state backend.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
index 4249becd126..6cd72abd135 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
@@ -31,14 +31,21 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * A {@link DataView} that provides {@link List}-like functionality in the 
accumulator of an {@link
- * AggregateFunction} or {@link TableAggregateFunction} when large amounts of 
data are expected.
+ * A {@link DataView} that provides {@link List}-like functionality in state 
entries.
  *
  * <p>A {@link ListView} can be backed by a Java {@link ArrayList} or can 
leverage Flink's state
- * backends depending on the context in which the aggregate function is used. 
In many unbounded data
- * scenarios, the {@link ListView} delegates all calls to a {@link ListState} 
instead of the {@link
+ * backends depending on the context. In many unbounded data scenarios, the 
{@link ListView}
+ * delegates all calls to a {@link ListState} instead of the {@link ArrayList}.
+ *
+ * <p>For aggregating functions, the view can be used as a field in the 
accumulator of an {@link
+ * AggregateFunction} or {@link TableAggregateFunction} when large amounts of 
data are expected.
+ * Aggregate functions might be used at various locations (pre-aggregation, 
combiners, merging of
+ * window slides, etc.) for some of these locations the data view is not 
backed by state but {@link
  * ArrayList}.
  *
+ * <p>For process table functions, the view can be used as a top-level state 
entry. Data views in
+ * PTFs are always backed by state.
+ *
  * <p>Note: Elements of a {@link ListView} must not be null. For heap-based 
state backends, {@code
  * hashCode/equals} of the original (i.e. external) class are used. However, 
the serialization
  * format will use internal data structures.
@@ -57,7 +64,7 @@ import java.util.Objects;
  *    public ListView<String> list = new ListView<>();
  *
  *    // or explicit:
- *    // {@literal @}DataTypeHint("ARRAY<STRING>")
+ *    // @DataTypeHint("ARRAY < STRING >")
  *    // public ListView<String> list = new ListView<>();
  *
  *    public long count = 0L;
@@ -65,7 +72,7 @@ import java.util.Objects;
  *
  *  public class MyAggregateFunction extends AggregateFunction<String, 
MyAccumulator> {
  *
- *   {@literal @}Override
+ *   @Override
  *   public MyAccumulator createAccumulator() {
  *     return new MyAccumulator();
  *   }
@@ -75,7 +82,7 @@ import java.util.Objects;
  *     accumulator.count++;
  *   }
  *
- *   {@literal @}Override
+ *   @Override
  *   public String getValue(MyAccumulator accumulator) {
  *     // return the count and the joined elements
  *     return count + ": " + String.join("|", acc.list.get());
@@ -84,9 +91,6 @@ import java.util.Objects;
  *
  * }</pre>
  *
- * <p>{@code ListView(TypeInformation<?> elementType)} method was deprecated 
and then removed.
- * Please use a {@link DataTypeHint} instead.
- *
  * @param <T> element type
  */
 @PublicEvolving
@@ -152,7 +156,7 @@ public class ListView<T> implements DataView {
         return list.remove(value);
     }
 
-    /** Removes all of the elements from this list view. */
+    /** Removes all elements from this list view. */
     @Override
     public void clear() {
         list.clear();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index 887d9dfe44f..2bfd5d88127 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -26,19 +26,27 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.types.DataType;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * A {@link DataView} that provides {@link Map}-like functionality in the 
accumulator of an {@link
- * AggregateFunction} or {@link TableAggregateFunction} when large amounts of 
data are expected.
+ * A {@link DataView} that provides {@link Map}-like functionality in state 
entries.
  *
  * <p>A {@link MapView} can be backed by a Java {@link HashMap} or can 
leverage Flink's state
- * backends depending on the context in which the aggregate function is used. 
In many unbounded data
- * scenarios, the {@link MapView} delegates all calls to a {@link MapState} 
instead of the {@link
- * HashMap}.
+ * backends depending on the context. In many unbounded data scenarios, the 
{@link MapView}
+ * delegates all calls to a {@link MapState} instead of the {@link HashMap}.
+ *
+ * <p>For aggregating functions, the view can be used as a field in the 
accumulator of an {@link
+ * AggregateFunction} or {@link TableAggregateFunction} when large amounts of 
data are expected.
+ * Aggregate functions might be used at various locations (pre-aggregation, 
combiners, merging of
+ * window slides, etc.) for some of these locations the data view is not 
backed by state but {@link
+ * ArrayList}.
+ *
+ * <p>For process table functions, the view can be used as a top-level state 
entry. Data views in
+ * PTFs are always backed by state.
  *
  * <p>Note: Keys of a {@link MapView} must not be null. Nulls in values are 
supported. For
  * heap-based state backends, {@code hashCode/equals} of the original (i.e. 
external) class are
@@ -58,7 +66,7 @@ import java.util.Objects;
  *   public MapView<String, Integer> map = new MapView<>();
  *
  *   // or explicit:
- *   // {@literal @}DataTypeHint("MAP<STRING, INT>")
+ *   // @DataTypeHint("MAP < STRING, INT >")
  *   // public MapView<String, Integer> map = new MapView<>();
  *
  *   public long count;
@@ -66,7 +74,7 @@ import java.util.Objects;
  *
  * public class MyAggregateFunction extends AggregateFunction<Long, 
MyAccumulator> {
  *
- *  {@literal @}Override
+ *  @Override
  *  public MyAccumulator createAccumulator() {
  *    return new MyAccumulator();
  *  }
@@ -78,7 +86,7 @@ import java.util.Objects;
  *     }
  *   }
  *
- *  {@literal @}Override
+ *   @Override
  *   public Long getValue(MyAccumulator accumulator) {
  *     return accumulator.count;
  *   }
@@ -86,9 +94,6 @@ import java.util.Objects;
  *
  * }</pre>
  *
- * <p>{@code MapView(TypeInformation<?> keyType, TypeInformation<?> 
valueType)} method was
- * deprecated and removed. Please use a {@link DataTypeHint} instead.
- *
  * @param <K> key type
  * @param <V> value type
  */
@@ -119,8 +124,9 @@ public class MapView<K, V> implements DataView {
     /**
      * Return the value for the specified key or {@code null} if the key is 
not in the map view.
      *
-     * @param key The look up key.
-     * @return The value for the specified key.
+     * @param key The key whose associated value is to be returned
+     * @return The value to which the specified key is mapped, or {@code null} 
if this map contains
+     *     no mapping for the key
      * @throws Exception Thrown if the system cannot get data.
      */
     public V get(K key) throws Exception {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
index 2bcfeb9befc..21c92cbfa4c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.annotation.ArgumentTrait;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.annotation.StateHint;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
 import org.apache.flink.table.types.inference.TypeInference;
@@ -290,6 +292,54 @@ import java.time.LocalDateTime;
  * }
  * }</pre>
  *
+ * <h2>Large State</h2>
+ *
+ * <p>Flink's state backends provide different types of state to efficiently 
handle large state.
+ *
+ * <p>Currently, PTFs support three types of state:
+ *
+ * <ul>
+ *   <li><b>Value state</b>: Represents a single value.
+ *   <li><b>List state</b>: Represents a list of values, supporting operations 
like appending,
+ *       removing, and iterating.
+ *   <li><b>Map state</b>: Represents a map (key-value pair) for efficient 
lookups, modifications,
+ *       and removal of individual entries.
+ * </ul>
+ *
+ * <p>By default, state entries in a PTF are represented as value state. This 
means that every state
+ * entry is fully read from the state backend when the evaluation method is 
called, and the value is
+ * written back to the state backend once the evaluation method finishes.
+ *
+ * <p>To optimize state access and avoid unnecessary (de)serialization, state 
entries can be
+ * declared as {@link ListView} or {@link MapView}. These provide direct views 
to the underlying
+ * Flink state backend.
+ *
+ * <p>For example, when using a {@link MapView}, accessing a value via {@link 
MapView#get(Object)}
+ * will only deserialize the value associated with the specified key. This 
allows for efficient
+ * access to individual entries without needing to load the entire map. This 
approach is
+ * particularly useful when the map does not fit entirely into memory.
+ *
+ * <p>State TTL is applied individually to each entry in a list or map, 
allowing for fine-grained
+ * expiration control over state elements.
+ *
+ * <pre>{@code
+ * // Function that uses a map view for storing a large map for an event 
history per user
+ * class HistoryFunction extends ProcessTableFunction<String> {
+ *   public void eval(@StateHint MapView<String, Integer> largeMemory, 
@ArgumentHint(TABLE_AS_SET) Row input) {
+ *     String eventId = input.getFieldAs("eventId");
+ *     Integer count = largeMemory.get(eventId);
+ *     if (count == null) {
+ *       largeMemory.put(eventId, 1);
+ *     } else {
+ *       if (count > 1000) {
+ *         collect("Anomaly detected: " + eventId);
+ *       }
+ *       largeMemory.put(eventId, count + 1);
+ *     }
+ *   }
+ * }
+ * }</pre>
+ *
  * <h1>Time and Timers</h1>
  *
  * <p>A PTF supports event time natively. Time-based services are available 
via {@link
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index 3eea834a3ab..13e13b5a808 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -206,7 +206,8 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                         .collect(Collectors.toList());
         final GeneratedHashFunction[] stateHashCode =
                 runtimeStateInfos.stream()
-                        .map(RuntimeStateInfo::getType)
+                        .map(RuntimeStateInfo::getDataType)
+                        .map(DataType::getLogicalType)
                         .map(
                                 t ->
                                         HashCodeGenerator.generateRowHash(
@@ -217,7 +218,8 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                         .toArray(GeneratedHashFunction[]::new);
         final GeneratedRecordEqualiser[] stateEquals =
                 runtimeStateInfos.stream()
-                        .map(RuntimeStateInfo::getType)
+                        .map(RuntimeStateInfo::getDataType)
+                        .map(DataType::getLogicalType)
                         .map(t -> 
EqualiserCodeGenerator.generateRowEquals(ctx, t, "StateEquals"))
                         .toArray(GeneratedRecordEqualiser[]::new);
 
@@ -315,7 +317,7 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
             String name, StateInfo stateInfo, ExecNodeConfig config) {
         return new RuntimeStateInfo(
                 name,
-                stateInfo.getDataType().getLogicalType(),
+                stateInfo.getDataType(),
                 deriveStateTimeToLive(
                         stateInfo.getTimeToLive().orElse(null), 
config.getStateRetentionTime()));
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index c5064961be4..42914555c81 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -18,7 +18,9 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.OpenContext
+import org.apache.flink.api.common.state.{ListState, MapState}
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.dataview.{DataView, ListView, MapView}
 import org.apache.flink.table.connector.ChangelogMode
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.conversion.RowRowConverter
@@ -33,17 +35,20 @@ import 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{ver
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
-import org.apache.flink.table.planner.plan.utils.RexLiteralUtil
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.runtime.dataview.DataViewUtils
+import org.apache.flink.table.runtime.dataview.StateListView.KeyedStateListView
+import 
org.apache.flink.table.runtime.dataview.StateMapView.KeyedStateMapViewWithKeysNotNull
 import org.apache.flink.table.runtime.generated.{GeneratedProcessTableRunner, 
ProcessTableRunner}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.extraction.ExtractionUtils
 import org.apache.flink.table.types.inference.{StaticArgument, 
StaticArgumentTrait, SystemTypeInference, TypeInferenceUtil}
 import org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.types.Row
 
-import org.apache.calcite.rex.{RexCall, RexCallBinding, RexLiteral, RexNode, 
RexUtil}
+import org.apache.calcite.rex.{RexCall, RexCallBinding, RexNode, RexUtil}
 import org.apache.calcite.sql.SqlKind
 
 import java.util
@@ -116,16 +121,19 @@ object ProcessTableRunnerGenerator {
     val stateDataTypes = stateInfos.asScala.values.map(_.getDataType).toSeq
     stateDataTypes.foreach(ExtractionUtils.checkStateDataType)
 
-    val stateToFunctionTerm = "stateToFunction"
+    val stateHandlesTerm = "stateHandles"
+    val valueStateToFunctionTerm = "valueStateToFunction"
     val stateClearedTerm = "stateCleared"
-    val stateFromFunctionTerm = "stateFromFunction"
-    val externalStateOperands = generateStateToFunction(ctx, stateDataTypes, 
stateToFunctionTerm)
+    val valueStateFromFunctionTerm = "valueStateFromFunction"
+    val stateEntries = stateInfos.asScala.values.zipWithIndex.toSeq
+    val externalStateOperands =
+      generateStateToFunction(ctx, stateEntries, stateHandlesTerm, 
valueStateToFunctionTerm)
     val stateFromFunctionCode = generateStateFromFunction(
       ctx,
-      stateDataTypes,
+      stateEntries,
       externalStateOperands,
       stateClearedTerm,
-      stateFromFunctionTerm)
+      valueStateFromFunctionTerm)
 
     // Generate result collector
     val resultCollectorTerm =
@@ -238,62 +246,118 @@ object ProcessTableRunnerGenerator {
 
   private def generateStateToFunction(
       ctx: CodeGeneratorContext,
-      stateDataTypes: Seq[DataType],
-      stateToFunctionTerm: String): Seq[GeneratedExpression] = {
-    stateDataTypes.zipWithIndex
-      .map {
-        case (stateDataType, pos) =>
-          val stateEntryTerm = s"$stateToFunctionTerm[$pos]"
-          val externalStateTypeTerm = 
typeTerm(stateDataType.getConversionClass)
-          val externalStateTerm = newName(ctx, "externalState")
-
-          val converterCode = genToExternalConverter(ctx, stateDataType, 
stateEntryTerm)
-
-          val constructorCode = stateDataType.getConversionClass match {
-            case rowType if rowType == classOf[Row] =>
-              // This allows us to retrieve the converter term that has been 
generated
-              // in genToExternalConverter(). The converter is able to created 
named positions
-              // for row fields.
-              val converterTerm = ctx.addReusableConverter(stateDataType)
-              s"((${className[RowRowConverter]}) 
$converterTerm).createEmptyRow()"
-            case rowType if rowType == classOf[RowData] =>
-              val fieldCount = 
LogicalTypeChecks.getFieldCount(stateDataType.getLogicalType)
-              s"new $GENERIC_ROW($fieldCount)"
-            case structuredType @ _ => s"new ${className(structuredType)}()"
-          }
+      stateEntries: Seq[(StateInfo, Int)],
+      stateHandlesTerm: String,
+      valueStateToFunctionTerm: String): Seq[GeneratedExpression] = {
+    stateEntries.map {
+      case (stateInfo, pos) =>
+        val stateDataType = stateInfo.getDataType
+        val stateType = stateDataType.getLogicalType
+        val externalStateTypeTerm = typeTerm(stateDataType.getConversionClass)
+        val externalStateTerm = newName(ctx, "externalState")
+
+        val externalStateCode = if (DataViewUtils.isDataView(stateType, 
classOf[DataView])) {
+          generateDataViewStateToFunction(
+            ctx,
+            stateHandlesTerm,
+            pos,
+            stateType,
+            externalStateTypeTerm,
+            externalStateTerm)
+          NO_CODE
+        } else {
+          DataViewUtils.checkForInvalidDataViews(stateType)
+          generateValueStateToFunction(
+            ctx,
+            valueStateToFunctionTerm,
+            pos,
+            externalStateTypeTerm,
+            externalStateTerm,
+            stateDataType)
+        }
 
-          val externalStateCode =
-            s"""
-               |final $externalStateTypeTerm $externalStateTerm;
-               |if ($stateEntryTerm == null) {
-               |  $externalStateTerm = $constructorCode;
-               |} else {
-               |  $externalStateTerm = $converterCode;
-               |}
-               |""".stripMargin
+        GeneratedExpression(s"$externalStateTerm", NEVER_NULL, 
externalStateCode, stateType)
+    }
+  }
 
-          GeneratedExpression(
-            s"$externalStateTerm",
-            NEVER_NULL,
-            externalStateCode,
-            stateDataType.getLogicalType)
+  private def generateDataViewStateToFunction(
+      ctx: CodeGeneratorContext,
+      stateHandlesTerm: String,
+      pos: Int,
+      stateType: LogicalType,
+      externalStateTypeTerm: String,
+      externalStateTerm: String): Unit = {
+    ctx.addReusableMember(s"private $externalStateTypeTerm 
$externalStateTerm;")
+
+    val (constructor, stateHandleTypeTerm) =
+      if (DataViewUtils.isDataView(stateType, classOf[ListView[_]])) {
+        (className[KeyedStateListView[_, _]], className[ListState[_]])
+      } else if (DataViewUtils.isDataView(stateType, classOf[MapView[_, _]])) {
+        (className[KeyedStateMapViewWithKeysNotNull[_, _, _]], 
className[MapState[_, _]])
       }
+
+    val openCode =
+      s"""
+         |$externalStateTerm = new $constructor(($stateHandleTypeTerm) 
$stateHandlesTerm[$pos]);
+         """.stripMargin
+    ctx.addReusableOpenStatement(openCode)
+  }
+
+  private def generateValueStateToFunction(
+      ctx: CodeGeneratorContext,
+      valueStateToFunctionTerm: String,
+      pos: Int,
+      externalStateTypeTerm: String,
+      externalStateTerm: String,
+      stateDataType: DataType): String = {
+    val stateEntryTerm = s"$valueStateToFunctionTerm[$pos]"
+    val converterCode = genToExternalConverter(ctx, stateDataType, 
stateEntryTerm)
+
+    val constructorCode = stateDataType.getConversionClass match {
+      case rowType if rowType == classOf[Row] =>
+        // This allows us to retrieve the converter term that has been 
generated
+        // in genToExternalConverter(). The converter is able to create named 
positions
+        // for row fields.
+        val converterTerm = ctx.addReusableConverter(stateDataType)
+        s"((${className[RowRowConverter]}) $converterTerm).createEmptyRow()"
+      case rowType if rowType == classOf[RowData] =>
+        val fieldCount = 
LogicalTypeChecks.getFieldCount(stateDataType.getLogicalType)
+        s"new $GENERIC_ROW($fieldCount)"
+      case structuredType @ _ => s"new ${className(structuredType)}()"
+    }
+
+    s"""
+       |final $externalStateTypeTerm $externalStateTerm;
+       |if ($stateEntryTerm == null) {
+       |  $externalStateTerm = $constructorCode;
+       |} else {
+       |  $externalStateTerm = $converterCode;
+       |}
+       |""".stripMargin
   }
 
   private def generateStateFromFunction(
       ctx: CodeGeneratorContext,
-      stateDataTypes: Seq[DataType],
+      stateEntries: Seq[(StateInfo, Int)],
       externalStateOperands: Seq[GeneratedExpression],
       stateClearedTerm: String,
       stateFromFunctionTerm: String): String = {
-    stateDataTypes.zipWithIndex
+    stateEntries
       .map {
-        case (stateDataType, pos) =>
-          val stateEntryTerm = s"$stateFromFunctionTerm[$pos]"
-          val externalStateOperandTerm = externalStateOperands(pos).resultTerm
-          s"$stateEntryTerm = $stateClearedTerm[$pos] ? null : " +
-            s"${genToInternalConverter(ctx, 
stateDataType)(externalStateOperandTerm)};"
+        case (stateInfo, pos) =>
+          val stateDataType = stateInfo.getDataType
+          val stateType = stateDataType.getLogicalType
+
+          if (DataViewUtils.isDataView(stateType, classOf[DataView])) {
+            NO_CODE
+          } else {
+            val stateEntryTerm = s"$stateFromFunctionTerm[$pos]"
+            val externalStateOperandTerm = 
externalStateOperands(pos).resultTerm
+            s"$stateEntryTerm = $stateClearedTerm[$pos] ? null : " +
+              s"${genToInternalConverter(ctx, 
stateDataType)(externalStateOperandTerm)};"
+          }
       }
+      .filter(c => c != NO_CODE)
       .mkString("\n")
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 21925ddc5d5..8c9404de7b9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -38,11 +38,10 @@ import 
org.apache.flink.table.planner.plan.`trait`.{ModifyKindSetTrait, ModifyKi
 import org.apache.flink.table.planner.plan.logical.{HoppingWindowSpec, 
WindowSpec}
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
-import org.apache.flink.table.planner.typeutils.DataViewUtils
 import 
org.apache.flink.table.planner.typeutils.LegacyDataViewUtils.useNullSerializerForStateViewFieldsFromAccType
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory
-import org.apache.flink.table.runtime.dataview.DataViewSpec
+import org.apache.flink.table.runtime.dataview.{DataViewSpec, DataViewUtils}
 import 
org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction
 import org.apache.flink.table.runtime.groupwindow._
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/aggregation.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/aggregation.scala
index 76aa5b28085..1001e38e645 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/aggregation.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/aggregation.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.functions.UserDefinedFunction
-import org.apache.flink.table.planner.typeutils.DataViewUtils.DistinctViewSpec
 import org.apache.flink.table.runtime.dataview.DataViewSpec
+import org.apache.flink.table.runtime.dataview.DataViewUtils.DistinctViewSpec
 import org.apache.flink.table.types.DataType
 
 import org.apache.calcite.rel.core.AggregateCall
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d571d753885..e2de3f8dba1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import 
org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest.PojoClass;
-import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.dataview.DataViewUtils;
 import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index ef35001d0de..8feb59a1678 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -72,6 +72,8 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 
ProcessTableFunctionTestPrograms.PROCESS_CHAINED_TIME_TABLE_API,
                 
ProcessTableFunctionTestPrograms.PROCESS_INVALID_TABLE_AS_ROW_TIMERS,
                 
ProcessTableFunctionTestPrograms.PROCESS_INVALID_PASS_THROUGH_TIMERS,
-                
ProcessTableFunctionTestPrograms.PROCESS_INVALID_UPDATING_TIMERS);
+                
ProcessTableFunctionTestPrograms.PROCESS_INVALID_UPDATING_TIMERS,
+                ProcessTableFunctionTestPrograms.PROCESS_LIST_STATE,
+                ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 4bf600228c9..a2396a80e76 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidTableAsRowTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidUpdatingTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.LateTimersFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ListStateFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MapStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NamedTimersFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.OptionalOnTimeFunction;
@@ -960,4 +962,38 @@ public class ProcessTableFunctionTestPrograms {
                             TableRuntimeException.class,
                             "Timers are not supported in the current PTF 
declaration.")
                     .build();
+
+    public static final TableTestProgram PROCESS_LIST_STATE =
+            TableTestProgram.of("process-list-state", "list view state entry")
+                    .setupTemporarySystemFunction("f", ListStateFunction.class)
+                    .setupSql(MULTI_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(KEYED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, {[], KeyedStateListView, 
+I[Bob, 12]}]",
+                                            "+I[Alice, {[], 
KeyedStateListView, +I[Alice, 42]}]",
+                                            "+I[Bob, {[0], KeyedStateListView, 
+I[Bob, 99]}]",
+                                            "+I[Bob, {[0, 1], 
KeyedStateListView, +I[Bob, 100]}]",
+                                            "+I[Alice, {[0], 
KeyedStateListView, +I[Alice, 400]}]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram PROCESS_MAP_STATE =
+            TableTestProgram.of("process-map-state", "map view state entry")
+                    .setupTemporarySystemFunction("f", MapStateFunction.class)
+                    .setupSql(MULTI_VALUES)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(KEYED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, {{}, 
KeyedStateMapViewWithKeysNotNull, +I[Bob, 12]}]",
+                                            "+I[Alice, {{}, 
KeyedStateMapViewWithKeysNotNull, +I[Alice, 42]}]",
+                                            "+I[Bob, {{Bob=2, nullValue=null, 
oldBob=1}, KeyedStateMapViewWithKeysNotNull, +I[Bob, 99]}]",
+                                            "+I[Bob, {{}, 
KeyedStateMapViewWithKeysNotNull, +I[Bob, 100]}]",
+                                            "+I[Alice, {{Alice=2, 
nullValue=null, oldAlice=1}, KeyedStateMapViewWithKeysNotNull, +I[Alice, 
400]}]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 036caa43571..4e526d07fd9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -22,6 +22,9 @@ import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.ArgumentTrait;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.StateHint;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableSemantics;
@@ -32,6 +35,11 @@ import org.apache.flink.types.Row;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
@@ -39,6 +47,7 @@ import static 
org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME;
 import static org.apache.flink.table.annotation.ArgumentTrait.SUPPORT_UPDATES;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Testing functions for {@link ProcessTableFunction}. */
 @SuppressWarnings("unused")
@@ -344,9 +353,9 @@ public class ProcessTableFunctionTestUtils {
                 collect(
                         String.format(
                                 "s1=%s, s2=%s, s3=%s",
-                                
internalContext.getValueStateDescriptor("s1").getTtlConfig(),
-                                
internalContext.getValueStateDescriptor("s2").getTtlConfig(),
-                                
internalContext.getValueStateDescriptor("s3").getTtlConfig()));
+                                
internalContext.getStateDescriptor("s1").getTtlConfig(),
+                                
internalContext.getStateDescriptor("s2").getTtlConfig(),
+                                
internalContext.getStateDescriptor("s3").getTtlConfig()));
                 s0.setField("emitted", true);
             }
         }
@@ -605,6 +614,80 @@ public class ProcessTableFunctionTestUtils {
         }
     }
 
+    /** Testing function. */
+    public static class ListStateFunction extends TestProcessTableFunctionBase 
{
+        public void eval(
+                Context ctx,
+                @StateHint ListView<String> s,
+                @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r)
+                throws Exception {
+            collectObjects(s.getList(), s.getClass().getSimpleName(), r);
+
+            // get
+            int count = s.getList().size();
+
+            // create
+            s.add(String.valueOf(count));
+
+            // null behavior
+            assertThatThrownBy(() -> s.add(null))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("List views don't support null 
values.");
+            assertThatThrownBy(() -> s.addAll(Arrays.asList("item0", null)))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("List views don't support null 
values.");
+
+            // clear
+            if (count == 2) {
+                ctx.clearState("s");
+            }
+        }
+    }
+
+    /** Testing function. */
+    public static class MapStateFunction extends TestProcessTableFunctionBase {
+        public void eval(
+                Context ctx,
+                @StateHint MapView<String, Integer> s,
+                @ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r)
+                throws Exception {
+            final String viewToString =
+                    s.getMap().entrySet().stream()
+                            .map(Objects::toString)
+                            .sorted()
+                            .collect(Collectors.joining(", ", "{", "}"));
+            collectObjects(viewToString, s.getClass().getSimpleName(), r);
+
+            // get
+            final String name = r.getFieldAs("name");
+            int count = 1;
+            if (s.contains(name)) {
+                count = s.get(name);
+            }
+
+            // create
+            s.put("old" + name, count);
+            s.put(name, count + 1);
+
+            // null behavior
+            assertThatThrownBy(() -> s.put(null, 42))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Map views don't support null 
keys.");
+            final Map<String, Integer> mapWithNull = new HashMap<>();
+            mapWithNull.put("key", 42);
+            mapWithNull.put(null, 42);
+            assertThatThrownBy(() -> s.putAll(mapWithNull))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Map views don't support null 
keys.");
+            s.put("nullValue", null);
+
+            // clear
+            if (count == 2) {
+                ctx.clearState("s");
+            }
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helpers
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/DataViewUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java
similarity index 81%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/DataViewUtils.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java
index db8d29abc48..94fa2a1c2b5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/DataViewUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java
@@ -16,20 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.typeutils;
+package org.apache.flink.table.runtime.dataview;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.dataview.DataView;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.data.binary.LazyBinaryFormat;
 import org.apache.flink.table.dataview.NullSerializer;
-import org.apache.flink.table.runtime.dataview.DataViewSpec;
-import org.apache.flink.table.runtime.dataview.ListViewSpec;
-import org.apache.flink.table.runtime.dataview.MapViewSpec;
 import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeTransformation;
@@ -51,13 +48,40 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNe
 /**
  * Utilities to deal with {@link DataView}s.
  *
- * <p>A {@link DataView} is either represented as a regular {@link 
StructuredType} or as a {@link
- * RawType} that serializes to {@code null} when backed by a state backend. In 
the latter case, a
- * {@link DataViewSpec} contains all information necessary to store and 
retrieve data from state.
+ * <p>For aggregating functions: A {@link DataView} is a field that is either 
represented as a
+ * regular {@link StructuredType} or as a {@link RawType} that serializes to 
{@code null} when
+ * backed by a state backend. In the latter case, a {@link DataViewSpec} 
contains all information
+ * necessary to store and retrieve data from state.
+ *
+ * <p>For process table functions: A {@link DataView} is a top-level instance 
that is always backed
+ * by a state backend.
  */
 @Internal
 public final class DataViewUtils {
 
+    /** Returns whether the given {@link LogicalType} qualifies as a {@link 
DataView}. */
+    public static boolean isDataView(LogicalType viewType, Class<? extends 
DataView> viewClass) {
+        final boolean isDataView =
+                viewType.is(STRUCTURED_TYPE)
+                        && ((StructuredType) viewType)
+                                .getImplementationClass()
+                                .map(viewClass::isAssignableFrom)
+                                .orElse(false);
+        if (!isDataView) {
+            return false;
+        }
+        
viewType.getChildren().forEach(DataViewUtils::checkForInvalidDataViews);
+        return true;
+    }
+
+    /** Checks that the given type and its children do not contain data views. 
*/
+    public static void checkForInvalidDataViews(LogicalType type) {
+        if (hasNested(type, t -> isDataView(t, DataView.class))) {
+            throw new ValidationException(
+                    "Data views are not supported at the declared location. 
Given type: " + type);
+        }
+    }
+
     /** Searches for data views in the data type of an accumulator and 
extracts them. */
     public static List<DataViewSpec> extractDataViews(int aggIndex, DataType 
accumulatorDataType) {
         final LogicalType accumulatorType = 
accumulatorDataType.getLogicalType();
@@ -85,11 +109,6 @@ public final class DataViewUtils {
                                 fieldDataType.getChildren().get(0),
                                 false));
             }
-            if (fieldType.getChildren().stream()
-                    .anyMatch(c -> hasNested(c, t -> isDataView(t, 
DataView.class)))) {
-                throw new TableException(
-                        "Data views are only supported in the first level of a 
composite accumulator type.");
-            }
         }
         return specs;
     }
@@ -138,14 +157,6 @@ public final class DataViewUtils {
         return "agg" + fieldIndex + "$" + fieldName;
     }
 
-    private static boolean isDataView(LogicalType t, Class<? extends DataView> 
viewClass) {
-        return t.is(STRUCTURED_TYPE)
-                && ((StructuredType) t)
-                        .getImplementationClass()
-                        .map(viewClass::isAssignableFrom)
-                        .orElse(false);
-    }
-
     // 
--------------------------------------------------------------------------------------------
 
     private static class DataViewsTransformation implements TypeTransformation 
{
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
index 8b2b4ac95ee..8aa0d0149c3 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.dataview;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.api.dataview.ListView;
 
 import java.util.ArrayList;
@@ -44,7 +45,7 @@ public abstract class StateListView<N, EE> extends 
ListView<EE> implements State
         try {
             get().forEach(list::add);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unable to collect list.", e);
         }
         return list;
     }
@@ -54,8 +55,10 @@ public abstract class StateListView<N, EE> extends 
ListView<EE> implements State
         clear();
         try {
             addAll(list);
+        } catch (TableRuntimeException e) {
+            throw e;
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unable to replace list.", e);
         }
     }
 
@@ -67,16 +70,19 @@ public abstract class StateListView<N, EE> extends 
ListView<EE> implements State
 
     @Override
     public void add(EE value) throws Exception {
+        checkValue(value);
         getListState().add(value);
     }
 
     @Override
     public void addAll(List<EE> list) throws Exception {
+        checkList(list);
         getListState().addAll(list);
     }
 
     @Override
     public boolean remove(EE value) throws Exception {
+        checkValue(value);
         Iterable<EE> iterable = getListState().get();
         if (iterable == null) {
             // ListState.get() may return null according to the Javadoc.
@@ -152,4 +158,16 @@ public abstract class StateListView<N, EE> extends 
ListView<EE> implements State
             return listState;
         }
     }
+
+    private static void checkValue(Object value) {
+        if (value == null) {
+            throw new TableRuntimeException("List views don't support null 
values.");
+        }
+    }
+
+    private static void checkList(List<?> list) {
+        if (list.contains(null)) {
+            throw new TableRuntimeException("List views don't support null 
values.");
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
index e7df0e53c34..b0ac947bd8d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.util.IterableIterator;
 
@@ -48,7 +49,7 @@ public abstract class StateMapView<N, EK, EV> extends 
MapView<EK, EV> implements
         try {
             entries().forEach(entry -> map.put(entry.getKey(), 
entry.getValue()));
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unable to collect map.", e);
         }
         return map;
     }
@@ -58,8 +59,10 @@ public abstract class StateMapView<N, EK, EV> extends 
MapView<EK, EV> implements
         clear();
         try {
             putAll(map);
+        } catch (TableRuntimeException e) {
+            throw e;
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unable to replace map.", e);
         }
     }
 
@@ -81,26 +84,31 @@ public abstract class StateMapView<N, EK, EV> extends 
MapView<EK, EV> implements
 
         @Override
         public EV get(EK key) throws Exception {
+            checkKey(key);
             return getMapState().get(key);
         }
 
         @Override
         public void put(EK key, EV value) throws Exception {
+            checkKey(key);
             getMapState().put(key, value);
         }
 
         @Override
         public void putAll(Map<EK, EV> map) throws Exception {
+            checkMap(map);
             getMapState().putAll(map);
         }
 
         @Override
         public void remove(EK key) throws Exception {
+            checkKey(key);
             getMapState().remove(key);
         }
 
         @Override
         public boolean contains(EK key) throws Exception {
+            checkKey(key);
             return getMapState().contains(key);
         }
 
@@ -137,6 +145,18 @@ public abstract class StateMapView<N, EK, EV> extends 
MapView<EK, EV> implements
         public void clear() {
             getMapState().clear();
         }
+
+        private static void checkKey(Object key) {
+            if (key == null) {
+                throw new TableRuntimeException("Map views don't support null 
keys.");
+            }
+        }
+
+        private static void checkMap(Map<?, ?> map) {
+            if (map.containsKey(null)) {
+                throw new TableRuntimeException("Map views don't support null 
keys.");
+            }
+        }
     }
 
     /**
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index 3136cefbcad..f285f0c3d46 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -45,7 +46,7 @@ import java.util.Arrays;
 public abstract class ProcessTableRunner extends AbstractRichFunction {
 
     // Constant references after initialization
-    private ValueState<RowData>[] stateHandles;
+    protected State[] stateHandles;
     private HashFunction[] stateHashCode;
     private RecordEqualiser[] stateEquals;
     private boolean emitRowtime;
@@ -68,7 +69,7 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     private @Nullable StringData timerName;
 
     /** State entries to be converted into external data structure; null if 
state is empty. */
-    protected RowData[] stateToFunction;
+    protected RowData[] valueStateToFunction;
 
     /**
      * Reference to whether the state has been cleared within the function; if 
yes, a conversion
@@ -77,10 +78,10 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     protected boolean[] stateCleared;
 
     /** State ready for persistence; null if {@link #stateCleared} was true 
during conversion. */
-    protected RowData[] stateFromFunction;
+    protected RowData[] valueStateFromFunction;
 
     public void initialize(
-            ValueState<RowData>[] stateHandles,
+            State[] stateHandles,
             HashFunction[] stateHashCode,
             RecordEqualiser[] stateEquals,
             boolean emitRowtime,
@@ -98,9 +99,9 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
         this.runnerOnTimerContext = runnerOnTimerContext;
         this.evalCollector = evalCollector;
         this.onTimerCollector = onTimerCollector;
-        this.stateToFunction = new RowData[stateHandles.length];
+        this.valueStateToFunction = new RowData[stateHandles.length];
         this.stateCleared = new boolean[stateHandles.length];
-        this.stateFromFunction = new RowData[stateHandles.length];
+        this.valueStateFromFunction = new RowData[stateHandles.length];
     }
 
     public void ingestTableEvent(int pos, RowData row, int timeColumn) {
@@ -183,34 +184,52 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void moveStateToFunction() throws IOException {
         Arrays.fill(stateCleared, false);
         for (int i = 0; i < stateHandles.length; i++) {
-            final RowData value = stateHandles[i].value();
-            stateToFunction[i] = value;
+            final State stateHandle = stateHandles[i];
+            if (!(stateHandle instanceof ValueState)) {
+                continue;
+            }
+            final ValueState<RowData> valueState = (ValueState<RowData>) 
stateHandle;
+            final RowData value = valueState.value();
+            valueStateToFunction[i] = value;
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void moveStateFromFunction() throws IOException {
         for (int i = 0; i < stateHandles.length; i++) {
-            final RowData fromFunction = stateFromFunction[i];
-            if (fromFunction == null || isEmpty(fromFunction)) {
-                // Reduce state size
-                stateHandles[i].clear();
+            final State stateHandle = stateHandles[i];
+            if (stateHandle instanceof ValueState) {
+                moveValueStateFromFunction((ValueState<RowData>) stateHandle, 
i);
             } else {
-                final HashFunction hashCode = stateHashCode[i];
-                final RecordEqualiser equals = stateEquals[i];
-                final RowData toFunction = stateToFunction[i];
-                // Reduce state updates by checking if something has changed
-                if (toFunction == null
-                        || hashCode.hashCode(toFunction) != 
hashCode.hashCode(fromFunction)
-                        || !equals.equals(toFunction, fromFunction)) {
-                    stateHandles[i].update(fromFunction);
+                if (stateCleared[i]) {
+                    stateHandle.clear();
                 }
             }
         }
     }
 
+    private void moveValueStateFromFunction(ValueState<RowData> valueState, 
int pos)
+            throws IOException {
+        final RowData fromFunction = valueStateFromFunction[pos];
+        if (fromFunction == null || isEmpty(fromFunction)) {
+            valueState.clear();
+        } else {
+            final HashFunction hashCode = stateHashCode[pos];
+            final RecordEqualiser equals = stateEquals[pos];
+            final RowData toFunction = valueStateToFunction[pos];
+            // Reduce state updates by checking if something has changed
+            if (toFunction == null
+                    || hashCode.hashCode(toFunction) != 
hashCode.hashCode(fromFunction)
+                    || !equals.equals(toFunction, fromFunction)) {
+                valueState.update(fromFunction);
+            }
+        }
+    }
+
     private static boolean isEmpty(RowData row) {
         for (int i = 0; i < row.getArity(); i++) {
             if (!row.isNullAt(i)) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
index 30ba25d82aa..cc3feffcd1e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
@@ -23,10 +23,12 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.DefaultOpenContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -40,20 +42,27 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
 import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.runtime.dataview.DataViewUtils;
 import org.apache.flink.table.runtime.generated.HashFunction;
 import org.apache.flink.table.runtime.generated.ProcessTableRunner;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import 
org.apache.flink.table.runtime.operators.process.TimeConverter.InstantTimeConverter;
 import 
org.apache.flink.table.runtime.operators.process.TimeConverter.LocalDateTimeConverter;
 import 
org.apache.flink.table.runtime.operators.process.TimeConverter.LongTimeConverter;
+import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.StringDataSerializer;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 
@@ -79,8 +88,8 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
     private transient ReadableInternalTimeContext internalTimeContext;
     private transient PassThroughCollectorBase evalCollector;
     private transient PassAllCollector onTimerCollector;
-    private transient ValueStateDescriptor<RowData>[] stateDescriptors;
-    private transient ValueState<RowData>[] stateHandles;
+    private transient StateDescriptor<?, ?>[] stateDescriptors;
+    private transient State[] stateHandles;
 
     private transient @Nullable MapState<StringData, Long> namedTimersMapState;
     private transient @Nullable InternalTimerService<StringData> 
namedTimerService;
@@ -241,7 +250,7 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
         }
 
         @VisibleForTesting
-        public ValueStateDescriptor<RowData> getValueStateDescriptor(String 
stateName) {
+        public StateDescriptor<?, ?> getStateDescriptor(String stateName) {
             final Integer statePos = stateNameToPosMap.get(stateName);
             if (statePos == null) {
                 throw new TableRuntimeException("Unknown state entry: " + 
stateName);
@@ -311,16 +320,37 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
         onTimerCollector = new PassAllCollector(output);
     }
 
-    @SuppressWarnings("unchecked")
     private void setStateDescriptors() {
-        final ValueStateDescriptor<RowData>[] stateDescriptors =
-                new ValueStateDescriptor[stateInfos.size()];
+        final StateDescriptor<?, ?>[] stateDescriptors = new 
StateDescriptor[stateInfos.size()];
         for (int i = 0; i < stateInfos.size(); i++) {
             final RuntimeStateInfo stateInfo = stateInfos.get(i);
-            final LogicalType type = stateInfo.getType();
-            final ValueStateDescriptor<RowData> stateDescriptor =
-                    new ValueStateDescriptor<>(
-                            stateInfo.getStateName(), 
InternalSerializers.create(type));
+            final DataType dataType = stateInfo.getDataType();
+            final LogicalType type = dataType.getLogicalType();
+            final String stateName = stateInfo.getStateName();
+
+            final StateDescriptor<?, ?> stateDescriptor;
+            if (DataViewUtils.isDataView(type, ListView.class)) {
+                final CollectionDataType arrayDataType =
+                        (CollectionDataType) dataType.getChildren().get(0);
+                final DataType elementDataType = 
arrayDataType.getElementDataType();
+                stateDescriptor =
+                        new ListStateDescriptor<>(
+                                stateName, 
ExternalSerializer.of(elementDataType));
+            } else if (DataViewUtils.isDataView(type, MapView.class)) {
+                final KeyValueDataType mapDataType =
+                        (KeyValueDataType) dataType.getChildren().get(0);
+                final DataType keyDataType = mapDataType.getKeyDataType();
+                final DataType valueDataType = mapDataType.getValueDataType();
+                stateDescriptor =
+                        new MapStateDescriptor<>(
+                                stateName,
+                                ExternalSerializer.of(keyDataType),
+                                ExternalSerializer.of(valueDataType));
+            } else {
+                stateDescriptor =
+                        new ValueStateDescriptor<>(stateName, 
InternalSerializers.create(type));
+            }
+
             final StateTtlConfig ttlConfig =
                     StateConfigUtil.createTtlConfig(stateInfo.getTimeToLive());
             if (ttlConfig.isEnabled()) {
@@ -331,12 +361,24 @@ public class ProcessTableOperator extends 
AbstractStreamOperator<RowData>
         this.stateDescriptors = stateDescriptors;
     }
 
-    @SuppressWarnings("unchecked")
     private void setStateHandles() {
         final KeyedStateStore keyedStateStore = getKeyedStateStore();
-        final ValueState<RowData>[] stateHandles = new 
ValueState[stateDescriptors.length];
-        for (int i = 0; i < stateInfos.size(); i++) {
-            stateHandles[i] = keyedStateStore.getState(stateDescriptors[i]);
+        final State[] stateHandles = new State[stateDescriptors.length];
+        for (int i = 0; i < stateDescriptors.length; i++) {
+            final StateDescriptor<?, ?> stateDescriptor = stateDescriptors[i];
+            final State stateHandle;
+            if (stateDescriptor instanceof ValueStateDescriptor) {
+                stateHandle = 
keyedStateStore.getState((ValueStateDescriptor<?>) stateDescriptor);
+            } else if (stateDescriptor instanceof ListStateDescriptor) {
+                stateHandle =
+                        keyedStateStore.getListState((ListStateDescriptor<?>) 
stateDescriptor);
+            } else if (stateDescriptor instanceof MapStateDescriptor) {
+                stateHandle =
+                        keyedStateStore.getMapState((MapStateDescriptor<?, ?>) 
stateDescriptor);
+            } else {
+                throw new IllegalStateException("Unknown state descriptor:" + 
stateDescriptor);
+            }
+            stateHandles[i] = stateHandle;
         }
         this.stateHandles = stateHandles;
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeStateInfo.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeStateInfo.java
index e616ecf5dee..5f983e32ff9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeStateInfo.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeStateInfo.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.runtime.operators.process;
 
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo;
-import org.apache.flink.table.types.logical.LogicalType;
 
 import java.io.Serializable;
 
@@ -32,12 +32,12 @@ public class RuntimeStateInfo implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String stateName;
-    private final LogicalType type;
+    private final DataType dataType;
     private final long timeToLive;
 
-    public RuntimeStateInfo(String stateName, LogicalType type, long 
timeToLive) {
+    public RuntimeStateInfo(String stateName, DataType dataType, long 
timeToLive) {
         this.stateName = stateName;
-        this.type = type;
+        this.dataType = dataType;
         this.timeToLive = timeToLive;
     }
 
@@ -45,8 +45,8 @@ public class RuntimeStateInfo implements Serializable {
         return stateName;
     }
 
-    public LogicalType getType() {
-        return type;
+    public DataType getDataType() {
+        return dataType;
     }
 
     public long getTimeToLive() {

Reply via email to